Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split Contexts Into Separate Files #3843

Merged
merged 5 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
882 changes: 0 additions & 882 deletions internal/scheduler/context/context.go

This file was deleted.

86 changes: 86 additions & 0 deletions internal/scheduler/context/gang.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package context

import (
"time"

"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

type GangSchedulingContext struct {
Created time.Time
Queue string
GangInfo
JobSchedulingContexts []*JobSchedulingContext
TotalResourceRequests schedulerobjects.ResourceList
AllJobsEvicted bool
RequestsFloatingResources bool
}

func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingContext {
allJobsEvicted := true
totalResourceRequests := schedulerobjects.NewResourceList(4)
requestsFloatingResources := false
for _, jctx := range jctxs {
allJobsEvicted = allJobsEvicted && jctx.IsEvicted
totalResourceRequests.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
if jctx.Job.RequestsFloatingResources() {
requestsFloatingResources = true
}
}
// Uniformity of the values that we pick off the first job in the gang was
// checked when the jobs were submitted (e.g., in ValidateApiJobs).
representative := jctxs[0]
return &GangSchedulingContext{
Created: time.Now(),
Queue: representative.Job.Queue(),
GangInfo: representative.GangInfo,
JobSchedulingContexts: jctxs,
TotalResourceRequests: totalResourceRequests,
AllJobsEvicted: allJobsEvicted,
RequestsFloatingResources: requestsFloatingResources,
}
}

// JobIds returns a sliced composed of the ids of the jobs that make up the gang.
func (gctx *GangSchedulingContext) JobIds() []string {
rv := make([]string, len(gctx.JobSchedulingContexts))
for i, jctx := range gctx.JobSchedulingContexts {
rv[i] = jctx.JobId
}
return rv
}

// Cardinality returns the number of jobs in the gang.
func (gctx *GangSchedulingContext) Cardinality() int {
return len(gctx.JobSchedulingContexts)
}

type GangSchedulingFit struct {
// The number of jobs in the gang that were successfully scheduled.
NumScheduled int
// The mean PreemptedAtPriority among successfully scheduled pods in the gang.
MeanPreemptedAtPriority float64
}

func (f GangSchedulingFit) Less(other GangSchedulingFit) bool {
return f.NumScheduled < other.NumScheduled || f.NumScheduled == other.NumScheduled && f.MeanPreemptedAtPriority > other.MeanPreemptedAtPriority
}

func (gctx *GangSchedulingContext) Fit() GangSchedulingFit {
f := GangSchedulingFit{}
totalPreemptedAtPriority := int32(0)
for _, jctx := range gctx.JobSchedulingContexts {
pctx := jctx.PodSchedulingContext
if !pctx.IsSuccessful() {
continue
}
f.NumScheduled++
totalPreemptedAtPriority += pctx.PreemptedAtPriority
}
if f.NumScheduled == 0 {
f.MeanPreemptedAtPriority = float64(totalPreemptedAtPriority)
} else {
f.MeanPreemptedAtPriority = float64(totalPreemptedAtPriority) / float64(f.NumScheduled)
}
return f
}
30 changes: 30 additions & 0 deletions internal/scheduler/context/gang_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package context

import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
)

func TestNewGangSchedulingContext(t *testing.T) {
jctxs := testNSmallCpuJobSchedulingContext("A", testfixtures.TestDefaultPriorityClass, 2)
gctx := NewGangSchedulingContext(jctxs)
assert.Equal(t, jctxs, gctx.JobSchedulingContexts)
assert.Equal(t, "A", gctx.Queue)
assert.Equal(t, testfixtures.TestDefaultPriorityClass, gctx.GangInfo.PriorityClassName)
assert.True(
t,
schedulerobjects.ResourceList{
Resources: map[string]resource.Quantity{
"cpu": resource.MustParse("2"),
"memory": resource.MustParse("8Gi"),
},
}.Equal(
gctx.TotalResourceRequests,
),
)
}
189 changes: 189 additions & 0 deletions internal/scheduler/context/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package context

import (
"fmt"
"strconv"
"strings"
"text/tabwriter"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/armada/configuration"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

// JobSchedulingContext is created by the scheduler and contains information
// about the decision made by the scheduler for a particular job.
type JobSchedulingContext struct {
// Time at which this context was created.
Created time.Time
// Id of the job this pod corresponds to.
JobId string
// Indicates whether this context is for re-scheduling an evicted job.
IsEvicted bool
// Job spec.
Job *jobdb.Job
// Scheduling requirements of this job.
// We currently require that each job contains exactly one pod spec.
PodRequirements *schedulerobjects.PodRequirements
// Resource requirements in an efficient internaltypes.ResourceList
ResourceRequirements internaltypes.ResourceList
// Node selectors to consider in addition to those included with the PodRequirements.
// These are added as part of scheduling to further constrain where nodes are scheduled,
// e.g., to ensure evicted jobs are re-scheduled onto the same node.
//
// If some key appears in both PodRequirements.NodeSelector and AdditionalNodeSelectors,
// the value in AdditionalNodeSelectors trumps that of PodRequirements.NodeSelector.
AdditionalNodeSelectors map[string]string
// Tolerations to consider in addition to those included with the PodRequirements.
// These are added as part of scheduling to expand the set of nodes a job can be scheduled on.
AdditionalTolerations []v1.Toleration
// Reason for why the job could not be scheduled.
// Empty if the job was scheduled successfully.
UnschedulableReason string
// Pod scheduling contexts for the individual pods that make up the job.
PodSchedulingContext *PodSchedulingContext
// GangInfo holds all the information that is necessary to schedule a gang,
// such as the lower and upper bounds on its size.
GangInfo
// This is the node the pod is assigned to.
// This is only set for evicted jobs and is set alongside adding an additionalNodeSelector for the node
AssignedNodeId string
}

func (jctx *JobSchedulingContext) String() string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
fmt.Fprintf(w, "Time:\t%s\n", jctx.Created)
fmt.Fprintf(w, "Job ID:\t%s\n", jctx.JobId)
if jctx.UnschedulableReason != "" {
fmt.Fprintf(w, "UnschedulableReason:\t%s\n", jctx.UnschedulableReason)
} else {
fmt.Fprint(w, "UnschedulableReason:\tnone\n")
}
if jctx.PodSchedulingContext != nil {
fmt.Fprint(w, jctx.PodSchedulingContext.String())
}
w.Flush()
return sb.String()
}

// SchedulingKey returns the scheduling key of the embedded job.
// If the jctx contains additional node selectors or tolerations,
// the key is invalid and the second return value is false.
func (jctx *JobSchedulingContext) SchedulingKey() (schedulerobjects.SchedulingKey, bool) {
if len(jctx.AdditionalNodeSelectors) != 0 || len(jctx.AdditionalTolerations) != 0 {
return schedulerobjects.EmptySchedulingKey, false
}
return jctx.Job.SchedulingKey(), true
}

func (jctx *JobSchedulingContext) IsSuccessful() bool {
return jctx.UnschedulableReason == ""
}

func (jctx *JobSchedulingContext) Fail(unschedulableReason string) {
jctx.UnschedulableReason = unschedulableReason
if pctx := jctx.PodSchedulingContext; pctx != nil {
pctx.NodeId = ""
}
}

func (jctx *JobSchedulingContext) GetAssignedNodeId() string {
return jctx.AssignedNodeId
}

func (jctx *JobSchedulingContext) SetAssignedNodeId(assignedNodeId string) {
if assignedNodeId != "" {
jctx.AssignedNodeId = assignedNodeId
jctx.AddNodeSelector(schedulerconfig.NodeIdLabel, assignedNodeId)
}
}

func (jctx *JobSchedulingContext) AddNodeSelector(key, value string) {
if jctx.AdditionalNodeSelectors == nil {
jctx.AdditionalNodeSelectors = map[string]string{key: value}
} else {
jctx.AdditionalNodeSelectors[key] = value
}
}

type GangInfo struct {
Id string
Cardinality int
PriorityClassName string
NodeUniformity string
}

// EmptyGangInfo returns a GangInfo for a job that is not in a gang.
func EmptyGangInfo(job interfaces.MinimalJob) GangInfo {
return GangInfo{
// An Id of "" indicates that this job is not in a gang; we set
// Cardinality (as well as the other fields,
// which all make sense in this context) accordingly.
Id: "",
Cardinality: 1,
PriorityClassName: job.PriorityClassName(),
NodeUniformity: job.Annotations()[configuration.GangNodeUniformityLabelAnnotation],
}
}

func GangInfoFromLegacySchedulerJob(job interfaces.MinimalJob) (GangInfo, error) {
gangInfo := EmptyGangInfo(job)

annotations := job.Annotations()

gangId, ok := annotations[configuration.GangIdAnnotation]
if !ok {
return gangInfo, nil
}
if gangId == "" {
return gangInfo, errors.Errorf("gang id is empty")
}

gangCardinalityString, ok := annotations[configuration.GangCardinalityAnnotation]
if !ok {
return gangInfo, errors.Errorf("annotation %s is missing", configuration.GangCardinalityAnnotation)
}
gangCardinality, err := strconv.Atoi(gangCardinalityString)
if err != nil {
return gangInfo, errors.WithStack(err)
}
if gangCardinality <= 0 {
return gangInfo, errors.Errorf("gang cardinality %d is non-positive", gangCardinality)
}

gangInfo.Id = gangId
gangInfo.Cardinality = gangCardinality
return gangInfo, nil
}

func JobSchedulingContextsFromJobs[J *jobdb.Job](jobs []J) []*JobSchedulingContext {
jctxs := make([]*JobSchedulingContext, len(jobs))
for i, job := range jobs {
jctxs[i] = JobSchedulingContextFromJob(job)
}
return jctxs
}

func JobSchedulingContextFromJob(job *jobdb.Job) *JobSchedulingContext {
gangInfo, err := GangInfoFromLegacySchedulerJob(job)
if err != nil {
logrus.Errorf("failed to extract gang info from job %s: %s", job.Id(), err)
}
return &JobSchedulingContext{
Created: time.Now(),
JobId: job.Id(),
Job: job,
PodRequirements: job.PodRequirements(),
ResourceRequirements: job.EfficientResourceRequirements(),
GangInfo: gangInfo,
}
}
26 changes: 26 additions & 0 deletions internal/scheduler/context/job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package context

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/armadaproject/armada/internal/scheduler/configuration"
)

func TestJobSchedulingContext_SetAssignedNodeId(t *testing.T) {
jctx := &JobSchedulingContext{}

assert.Equal(t, "", jctx.GetAssignedNodeId())
assert.Empty(t, jctx.AdditionalNodeSelectors)

// Will not add a node selector if input is empty
jctx.SetAssignedNodeId("")
assert.Equal(t, "", jctx.GetAssignedNodeId())
assert.Empty(t, jctx.AdditionalNodeSelectors)

jctx.SetAssignedNodeId("node1")
assert.Equal(t, "node1", jctx.GetAssignedNodeId())
assert.Len(t, jctx.AdditionalNodeSelectors, 1)
assert.Equal(t, map[string]string{configuration.NodeIdLabel: "node1"}, jctx.AdditionalNodeSelectors)
}
53 changes: 53 additions & 0 deletions internal/scheduler/context/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package context

import (
"fmt"
"strings"
"text/tabwriter"
"time"
)

// PodSchedulingContext is returned by SelectAndBindNodeToPod and
// contains detailed information on the scheduling decision made for this pod.
type PodSchedulingContext struct {
// Time at which this context was created.
Created time.Time
// ID of the node that the pod was assigned to, or empty.
NodeId string
// If set, indicates that the pod was scheduled on a specific node type.
WellKnownNodeTypeName string
// Priority this pod was most recently attempted to be scheduled at.
// If scheduling was successful, resources were marked as allocated to the job at this priority.
ScheduledAtPriority int32
// Maximum priority that this pod preempted other pods at.
PreemptedAtPriority int32
// Total number of nodes in the cluster when trying to schedule.
NumNodes int
// Number of nodes excluded by reason.
NumExcludedNodesByReason map[string]int
}

func (pctx *PodSchedulingContext) IsSuccessful() bool {
return pctx != nil && pctx.NodeId != ""
}

func (pctx *PodSchedulingContext) String() string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
if pctx.NodeId != "" {
fmt.Fprintf(w, "Node:\t%s\n", pctx.NodeId)
} else {
fmt.Fprint(w, "Node:\tnone\n")
}
fmt.Fprintf(w, "Number of nodes in cluster:\t%d\n", pctx.NumNodes)
if len(pctx.NumExcludedNodesByReason) == 0 {
fmt.Fprint(w, "Excluded nodes:\tnone\n")
} else {
fmt.Fprint(w, "Excluded nodes:\n")
for reason, count := range pctx.NumExcludedNodesByReason {
fmt.Fprintf(w, "\t%d:\t%s\n", count, reason)
}
}
w.Flush()
return sb.String()
}
Loading