Skip to content

Commit

Permalink
Remove unnecessary passing of priorityClasses + remove unused fields (#…
Browse files Browse the repository at this point in the history
…3801)

We pass priorityClasses around a lot, but largely these are unused at the bottom of the stack
 - I've removed this passing where the priorityClass is not used

Removed a few other minor unused parameters

Signed-off-by: JamesMurkin <[email protected]>
  • Loading branch information
JamesMurkin authored Jul 22, 2024
1 parent 7ac7f90 commit 56c9d9f
Show file tree
Hide file tree
Showing 18 changed files with 65 additions and 111 deletions.
2 changes: 1 addition & 1 deletion internal/scheduler/constraints/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity {
return q
}

func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext, queue string) (bool, string, error) {
func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext) (bool, string, error) {
// maximumResourcesToSchedule check.
if !isStrictlyLessOrEqual(sctx.ScheduledResources.Resources, constraints.maximumResourcesToSchedule) {
return false, MaximumResourcesScheduledUnschedulableReason, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/constraints/constraints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestConstraints(t *testing.T) {
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
ok, unscheduledReason, err := tc.constraints.CheckRoundConstraints(tc.sctx, tc.queue)
ok, unscheduledReason, err := tc.constraints.CheckRoundConstraints(tc.sctx)
require.NoError(t, err)
require.Equal(t, tc.expectedCheckRoundConstraintsReason == "", ok)
require.Equal(t, tc.expectedCheckRoundConstraintsReason, unscheduledReason)
Expand Down
11 changes: 1 addition & 10 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/armadaproject/armada/internal/common/armadaerrors"
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/common/types"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/fairness"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
Expand All @@ -36,10 +35,6 @@ type SchedulingContext struct {
Finished time.Time
// Pool for which we're currently scheduling jobs.
Pool string
// Allowed priority classes.
PriorityClasses map[string]types.PriorityClass
// Default priority class.
DefaultPriorityClass string
// Determines how fairness is computed.
FairnessCostProvider fairness.FairnessCostProvider
// Limits job scheduling rate globally across all queues.
Expand Down Expand Up @@ -78,17 +73,13 @@ type SchedulingContext struct {

func NewSchedulingContext(
pool string,
priorityClasses map[string]types.PriorityClass,
defaultPriorityClass string,
fairnessCostProvider fairness.FairnessCostProvider,
limiter *rate.Limiter,
totalResources schedulerobjects.ResourceList,
) *SchedulingContext {
return &SchedulingContext{
Started: time.Now(),
Pool: pool,
PriorityClasses: priorityClasses,
DefaultPriorityClass: defaultPriorityClass,
FairnessCostProvider: fairnessCostProvider,
Limiter: limiter,
QueueSchedulingContexts: make(map[string]*QueueSchedulingContext),
Expand Down Expand Up @@ -822,7 +813,7 @@ func GangInfoFromLegacySchedulerJob(job interfaces.MinimalJob) (GangInfo, error)
return gangInfo, nil
}

func JobSchedulingContextsFromJobs[J *jobdb.Job](priorityClasses map[string]types.PriorityClass, jobs []J) []*JobSchedulingContext {
func JobSchedulingContextsFromJobs[J *jobdb.Job](jobs []J) []*JobSchedulingContext {
jctxs := make([]*JobSchedulingContext, len(jobs))
for i, job := range jobs {
jctxs[i] = JobSchedulingContextFromJob(job)
Expand Down
4 changes: 0 additions & 4 deletions internal/scheduler/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ func TestSchedulingContextAccounting(t *testing.T) {
require.NoError(t, err)
sctx := NewSchedulingContext(
"pool",
testfixtures.TestPriorityClasses,
testfixtures.TestDefaultPriorityClass,
fairnessCostProvider,
nil,
totalResources,
Expand Down Expand Up @@ -243,8 +241,6 @@ func TestCalculateFairShares(t *testing.T) {
require.NoError(t, err)
sctx := NewSchedulingContext(
"pool",
testfixtures.TestPriorityClasses,
testfixtures.TestDefaultPriorityClass,
fairnessCostProvider,
nil,
tc.availableResources,
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/gang_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (sch *GangScheduler) updateGangSchedulingContextOnFailure(gctx *schedulerco
func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) {
// Exit immediately if this is a new gang and we've hit any round limits.
if !gctx.AllJobsEvicted {
if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext, gctx.Queue); err != nil || !ok {
if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext); err != nil || !ok {
return
}
}
Expand Down
4 changes: 1 addition & 3 deletions internal/scheduler/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,6 @@ func TestGangScheduler(t *testing.T) {
require.NoError(t, err)
sctx := schedulercontext.NewSchedulingContext(
"pool",
tc.SchedulingConfig.PriorityClasses,
tc.SchedulingConfig.DefaultPriorityClassName,
fairnessCostProvider,
rate.NewLimiter(
rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate),
Expand Down Expand Up @@ -593,7 +591,7 @@ func TestGangScheduler(t *testing.T) {
var actualScheduledIndices []int
scheduledGangs := 0
for i, gang := range tc.Gangs {
jctxs := schedulercontext.JobSchedulingContextsFromJobs(tc.SchedulingConfig.PriorityClasses, gang)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(gang)
gctx := schedulercontext.NewGangSchedulingContext(jctxs)
ok, reason, err := sch.Schedule(armadacontext.Background(), gctx)
require.NoError(t, err)
Expand Down
19 changes: 8 additions & 11 deletions internal/scheduler/jobiteration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/armadaproject/armada/internal/common/armadacontext"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/common/types"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
)
Expand Down Expand Up @@ -106,19 +105,17 @@ func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobIterator {

// QueuedJobsIterator is an iterator over all jobs in a queue.
type QueuedJobsIterator struct {
repo JobRepository
jobIds []string
priorityClasses map[string]types.PriorityClass
idx int
ctx *armadacontext.Context
repo JobRepository
jobIds []string
idx int
ctx *armadacontext.Context
}

func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, repo JobRepository, priorityClasses map[string]types.PriorityClass) *QueuedJobsIterator {
func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, repo JobRepository) *QueuedJobsIterator {
return &QueuedJobsIterator{
jobIds: repo.GetQueueJobIds(queue),
repo: repo,
priorityClasses: priorityClasses,
ctx: ctx,
jobIds: repo.GetQueueJobIds(queue),
repo: repo,
ctx: ctx,
}
}

Expand Down
16 changes: 8 additions & 8 deletions internal/scheduler/jobiteration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) {
ctx := armadacontext.Background()
its := make([]JobIterator, 3)
for i, queue := range []string{"A", "B", "C"} {
it := NewQueuedJobsIterator(ctx, queue, repo, nil)
it := NewQueuedJobsIterator(ctx, queue, repo)
its[i] = it
}
it := NewMultiJobsIterator(its...)
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestQueuedJobsIterator_OneQueue(t *testing.T) {
expected = append(expected, job.Id())
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", repo, nil)
it := NewQueuedJobsIterator(ctx, "A", repo)
actual := make([]string, 0)
for {
jctx, err := it.Next()
Expand All @@ -115,7 +115,7 @@ func TestQueuedJobsIterator_ExceedsBufferSize(t *testing.T) {
expected = append(expected, job.Id())
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", repo, nil)
it := NewQueuedJobsIterator(ctx, "A", repo)
actual := make([]string, 0)
for {
jctx, err := it.Next()
Expand All @@ -137,7 +137,7 @@ func TestQueuedJobsIterator_ManyJobs(t *testing.T) {
expected = append(expected, job.Id())
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", repo, nil)
it := NewQueuedJobsIterator(ctx, "A", repo)
actual := make([]string, 0)
for {
jctx, err := it.Next()
Expand All @@ -164,7 +164,7 @@ func TestCreateQueuedJobsIterator_TwoQueues(t *testing.T) {
repo.Enqueue(job)
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", repo, nil)
it := NewQueuedJobsIterator(ctx, "A", repo)
actual := make([]string, 0)
for {
jctx, err := it.Next()
Expand All @@ -187,7 +187,7 @@ func TestCreateQueuedJobsIterator_RespectsTimeout(t *testing.T) {
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), time.Millisecond)
time.Sleep(20 * time.Millisecond)
defer cancel()
it := NewQueuedJobsIterator(ctx, "A", repo, nil)
it := NewQueuedJobsIterator(ctx, "A", repo)
job, err := it.Next()
assert.Nil(t, job)
assert.ErrorIs(t, err, context.DeadlineExceeded)
Expand All @@ -205,7 +205,7 @@ func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) {
repo.Enqueue(job)
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", repo, nil)
it := NewQueuedJobsIterator(ctx, "A", repo)
for job, err := it.Next(); job != nil; job, err = it.Next() {
require.NoError(t, err)
}
Expand Down Expand Up @@ -243,7 +243,7 @@ func (repo *mockJobRepository) Enqueue(job *jobdb.Job) {
}

func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobIterator {
return NewQueuedJobsIterator(ctx, queue, repo, nil)
return NewQueuedJobsIterator(ctx, queue, repo)
}

func (repo *mockJobRepository) GetQueueJobIds(queue string) []string {
Expand Down
21 changes: 10 additions & 11 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *s
nodeCopy := node.node.DeepCopyNilKeys()
for _, job := range node.evictedJobs {
// Remove preempted job from node
err = nodeDb.unbindJobFromNodeInPlace(nodeDb.priorityClasses, job.JobSchedulingContext.Job, nodeCopy)
err = nodeDb.unbindJobFromNodeInPlace(job.JobSchedulingContext.Job, nodeCopy)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -942,7 +942,6 @@ func (nodeDb *NodeDb) bindJobToNodeInPlace(node *internaltypes.Node, job *jobdb.
// the jobs' priorities to evictedPriority; they are not subtracted from AllocatedByJobId and
// AllocatedByQueue.
func (nodeDb *NodeDb) EvictJobsFromNode(
priorityClasses map[string]types.PriorityClass,
jobFilter func(*jobdb.Job) bool,
jobs []*jobdb.Job,
node *internaltypes.Node,
Expand All @@ -954,15 +953,15 @@ func (nodeDb *NodeDb) EvictJobsFromNode(
continue
}
evicted = append(evicted, job)
if err := nodeDb.evictJobFromNodeInPlace(priorityClasses, job, node); err != nil {
if err := nodeDb.evictJobFromNodeInPlace(job, node); err != nil {
return nil, nil, err
}
}
return evicted, node, nil
}

// evictJobFromNodeInPlace is the in-place operation backing EvictJobsFromNode.
func (nodeDb *NodeDb) evictJobFromNodeInPlace(priorityClasses map[string]types.PriorityClass, job *jobdb.Job, node *internaltypes.Node) error {
func (nodeDb *NodeDb) evictJobFromNodeInPlace(job *jobdb.Job, node *internaltypes.Node) error {
jobId := job.Id()
if _, ok := node.AllocatedByJobId[jobId]; !ok {
return errors.Errorf("job %s has no resources allocated on node %s", jobId, node.GetId())
Expand Down Expand Up @@ -1010,27 +1009,27 @@ func markAllocatable(allocatableByPriority map[int32]internaltypes.ResourceList,
}

// UnbindJobsFromNode returns a node with all elements of jobs unbound from it.
func (nodeDb *NodeDb) UnbindJobsFromNode(priorityClasses map[string]types.PriorityClass, jobs []*jobdb.Job, node *internaltypes.Node) (*internaltypes.Node, error) {
func (nodeDb *NodeDb) UnbindJobsFromNode(jobs []*jobdb.Job, node *internaltypes.Node) (*internaltypes.Node, error) {
node = node.DeepCopyNilKeys()
for _, job := range jobs {
if err := nodeDb.unbindJobFromNodeInPlace(priorityClasses, job, node); err != nil {
if err := nodeDb.unbindJobFromNodeInPlace(job, node); err != nil {
return nil, err
}
}
return node, nil
}

// UnbindJobFromNode returns a copy of node with job unbound from it.
func (nodeDb *NodeDb) UnbindJobFromNode(priorityClasses map[string]types.PriorityClass, job *jobdb.Job, node *internaltypes.Node) (*internaltypes.Node, error) {
func (nodeDb *NodeDb) UnbindJobFromNode(job *jobdb.Job, node *internaltypes.Node) (*internaltypes.Node, error) {
node = node.DeepCopyNilKeys()
if err := nodeDb.unbindJobFromNodeInPlace(priorityClasses, job, node); err != nil {
if err := nodeDb.unbindJobFromNodeInPlace(job, node); err != nil {
return nil, err
}
return node, nil
}

// unbindPodFromNodeInPlace is like UnbindJobFromNode, but doesn't make a copy of node.
func (nodeDb *NodeDb) unbindJobFromNodeInPlace(priorityClasses map[string]types.PriorityClass, job *jobdb.Job, node *internaltypes.Node) error {
func (nodeDb *NodeDb) unbindJobFromNodeInPlace(job *jobdb.Job, node *internaltypes.Node) error {
jobId := job.Id()
requests := job.EfficientResourceRequirements()

Expand Down Expand Up @@ -1178,7 +1177,7 @@ func (nodeDb *NodeDb) AddEvictedJobSchedulingContextWithTxn(txn *memdb.Txn, inde
}

func nodeDbSchema(priorities []int32, resources []string) (*memdb.DBSchema, map[int32]string, map[int32]int) {
nodesTable, indexNameByPriority, keyIndexByPriority := nodesTableSchema(priorities, resources)
nodesTable, indexNameByPriority, keyIndexByPriority := nodesTableSchema(priorities)
evictionsTable := evictionsTableSchema()
return &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{
Expand All @@ -1188,7 +1187,7 @@ func nodeDbSchema(priorities []int32, resources []string) (*memdb.DBSchema, map[
}, indexNameByPriority, keyIndexByPriority
}

func nodesTableSchema(priorities []int32, resources []string) (*memdb.TableSchema, map[int32]string, map[int32]int) {
func nodesTableSchema(priorities []int32) (*memdb.TableSchema, map[int32]string, map[int32]int) {
indexes := make(map[string]*memdb.IndexSchema, len(priorities)+1)
indexes["id"] = &memdb.IndexSchema{
Name: "id",
Expand Down
26 changes: 13 additions & 13 deletions internal/scheduler/nodedb/nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) {
db, err := newNodeDbWithNodes(nodes)
require.NoError(t, err)
jobs := testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs)
for _, jctx := range jctxs {
txn := db.Txn(false)
jctx.SetAssignedNodeId(nodeId)
Expand All @@ -97,7 +97,7 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) {
db, err := newNodeDbWithNodes(nodes)
require.NoError(t, err)
jobs := testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs)
for _, jctx := range jctxs {
txn := db.Txn(false)
jctx.SetAssignedNodeId("non-existent node")
Expand Down Expand Up @@ -133,32 +133,32 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) {
boundNode, err := nodeDb.bindJobToNode(entry, job, job.PodRequirements().Priority)
require.NoError(t, err)

unboundNode, err := nodeDb.UnbindJobFromNode(testfixtures.TestPriorityClasses, job, boundNode)
unboundNode, err := nodeDb.UnbindJobFromNode(job, boundNode)
require.NoError(t, err)

unboundMultipleNode, err := nodeDb.UnbindJobsFromNode(testfixtures.TestPriorityClasses, []*jobdb.Job{job}, boundNode)
unboundMultipleNode, err := nodeDb.UnbindJobsFromNode([]*jobdb.Job{job}, boundNode)
require.NoError(t, err)

evictedJobs, evictedNode, err := nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []*jobdb.Job{job}, boundNode)
evictedJobs, evictedNode, err := nodeDb.EvictJobsFromNode(jobFilter, []*jobdb.Job{job}, boundNode)
require.NoError(t, err)
assert.Equal(t, []*jobdb.Job{job}, evictedJobs)

evictedUnboundNode, err := nodeDb.UnbindJobFromNode(testfixtures.TestPriorityClasses, job, evictedNode)
evictedUnboundNode, err := nodeDb.UnbindJobFromNode(job, evictedNode)
require.NoError(t, err)

evictedBoundNode, err := nodeDb.bindJobToNode(evictedNode, job, job.PodRequirements().Priority)
require.NoError(t, err)

_, _, err = nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []*jobdb.Job{job}, entry)
_, _, err = nodeDb.EvictJobsFromNode(jobFilter, []*jobdb.Job{job}, entry)
require.Error(t, err)

_, err = nodeDb.UnbindJobFromNode(testfixtures.TestPriorityClasses, job, entry)
_, err = nodeDb.UnbindJobFromNode(job, entry)
require.NoError(t, err)

_, err = nodeDb.bindJobToNode(boundNode, job, job.PodRequirements().Priority)
require.Error(t, err)

_, _, err = nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []*jobdb.Job{job}, evictedNode)
_, _, err = nodeDb.EvictJobsFromNode(jobFilter, []*jobdb.Job{job}, evictedNode)
require.Error(t, err)

assertNodeAccountingEqual(t, entry, unboundNode)
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestEviction(t *testing.T) {
for i, job := range jobs {
existingJobs[i] = job
}
actualEvictions, _, err := nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, tc.jobFilter, existingJobs, entry)
actualEvictions, _, err := nodeDb.EvictJobsFromNode(tc.jobFilter, existingJobs, entry)
require.NoError(t, err)
expectedEvictions := make([]*jobdb.Job, 0, len(tc.expectedEvictions))
for _, i := range tc.expectedEvictions {
Expand Down Expand Up @@ -434,7 +434,7 @@ func TestScheduleIndividually(t *testing.T) {
nodeDb, err := newNodeDbWithNodes(tc.Nodes)
require.NoError(t, err)

jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, tc.Jobs)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(tc.Jobs)

for i, jctx := range jctxs {
nodeDbTxn := nodeDb.Txn(true)
Expand Down Expand Up @@ -524,7 +524,7 @@ func TestScheduleMany(t *testing.T) {
require.NoError(t, err)
for i, jobs := range tc.Jobs {
nodeDbTxn := nodeDb.Txn(true)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs)
gctx := schedulercontext.NewGangSchedulingContext(jctxs)
ok, err := nodeDb.ScheduleManyWithTxn(nodeDbTxn, gctx)
require.NoError(t, err)
Expand Down Expand Up @@ -776,7 +776,7 @@ func benchmarkScheduleMany(b *testing.B, nodes []*schedulerobjects.Node, jobs []

b.ResetTimer()
for n := 0; n < b.N; n++ {
jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs)
jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs)
gctx := schedulercontext.NewGangSchedulingContext(jctxs)
txn := nodeDb.Txn(true)
_, err := nodeDb.ScheduleManyWithTxn(txn, gctx)
Expand Down
Loading

0 comments on commit 56c9d9f

Please sign in to comment.