Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove priority from PodRequirements #3808

Merged
merged 8 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions config/scheduleringester/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,5 @@ pulsar:
subscriptionName: "scheduler-ingester"
batchSize: 10000
batchDuration: 500ms
priorityClasses:
armada-default:
priority: 1000
armada-preemptible:
priority: 1000


1 change: 0 additions & 1 deletion internal/armada/event/conversion/conversions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ func TestConvertLeased(t *testing.T) {
Effect: v1.TaintEffectNoSchedule,
},
},
Priority: 1000,
},
},
},
Expand Down
5 changes: 0 additions & 5 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/armadaproject/armada/internal/armada/configuration"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
"github.com/armadaproject/armada/pkg/armadaevents"
Expand All @@ -31,8 +30,6 @@ var (
RunIdUuid = armadaevents.UuidFromProtoUuid(RunIdProto)
PartitionMarkerGroupIdUuid = armadaevents.UuidFromProtoUuid(PartitionMarkerGroupIdProto)
PriorityClassName = "test-priority"
PriorityClassValue = int32(100)
PriorityClasses = map[string]types.PriorityClass{PriorityClassName: {Priority: PriorityClassValue}}
Groups = []string{"group1", "group2"}
NodeSelector = map[string]string{"foo": "bar"}
Affinity = &v1.Affinity{
Expand Down Expand Up @@ -227,7 +224,6 @@ var Leased = &armadaevents.EventSequence_Event{
Effect: v1.TaintEffectNoSchedule,
},
},
Priority: 15,
},
},
},
Expand Down Expand Up @@ -347,7 +343,6 @@ var JobRequeued = &armadaevents.EventSequence_Event{
NodeSelector: NodeSelector,
Tolerations: Tolerations,
PreemptionPolicy: "PreemptLowerPriority",
Priority: PriorityClassValue,
Affinity: Affinity,
ResourceRequirements: v1.ResourceRequirements{
Limits: map[v1.ResourceName]resource.Quantity{
Expand Down
41 changes: 1 addition & 40 deletions internal/scheduler/adapters/adapters.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,17 @@
package adapters

import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
k8sResource "k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/pkg/api"
)

// PodRequirementsFromPodSpec function returns *schedulerobjects.PodRequirements for podSpec.
// An error is logged if the podSpec uses an unknown priority class.
// This function may mutate podSpec.
func PodRequirementsFromPodSpec(podSpec *v1.PodSpec, priorityByPriorityClassName map[string]types.PriorityClass) *schedulerobjects.PodRequirements {
priority, ok := PriorityFromPodSpec(podSpec, priorityByPriorityClassName)
if priorityByPriorityClassName != nil && !ok {
// Ignore this error if priorityByPriorityClassName is explicitly set to nil.
// We assume that in this case the caller is sure the priority does not need to be set.
err := errors.Errorf("unknown priorityClassName %s", podSpec.PriorityClassName)
logging.WithStacktrace(logrus.NewEntry(logrus.StandardLogger()), err).Error("failed to get priority from priorityClassName")
}
func PodRequirementsFromPodSpec(podSpec *v1.PodSpec) *schedulerobjects.PodRequirements {
preemptionPolicy := string(v1.PreemptLowerPriority)
if podSpec.PreemptionPolicy != nil {
preemptionPolicy = string(*podSpec.PreemptionPolicy)
Expand All @@ -31,39 +20,11 @@ func PodRequirementsFromPodSpec(podSpec *v1.PodSpec, priorityByPriorityClassName
NodeSelector: podSpec.NodeSelector,
Affinity: podSpec.Affinity,
Tolerations: podSpec.Tolerations,
Priority: priority,
PreemptionPolicy: preemptionPolicy,
ResourceRequirements: api.SchedulingResourceRequirementsFromPodSpec(podSpec),
}
}

// PriorityFromPodSpec returns the priority in a pod spec.
// If priority is set directly, that value is returned.
// Otherwise, it returns the value of the key podSpec.
// In both cases the value along with true boolean is returned.
// PriorityClassName in priorityByPriorityClassName map.
// If no priority is set for the pod spec, 0 along with a false boolean would be returned
func PriorityFromPodSpec(podSpec *v1.PodSpec, priorityClasses map[string]types.PriorityClass) (int32, bool) {
// If there's no podspec there's nothing we can do
if podSpec == nil {
return 0, false
}

// If a priority is directly specified, use that
if podSpec.Priority != nil {
return *podSpec.Priority, true
}

// If we find a priority class use that
priorityClass, ok := priorityClasses[podSpec.PriorityClassName]
if ok {
return priorityClass.Priority, true
}

// Couldn't find anything
return 0, false
}

func K8sResourceListToMap(resources v1.ResourceList) map[string]k8sResource.Quantity {
if resources == nil {
return nil
Expand Down
146 changes: 1 addition & 145 deletions internal/scheduler/adapters/adapters_test.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,14 @@
package adapters

import (
"io"
"os"
"testing"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/pointer"

"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

const (
PriorityClass0 = "priority-0"
PriorityClass1 = "priority-1"
PriorityClass2 = "priority-2"
PriorityClass3 = "priority-3"
)

var (
priorityByPriorityClassName = map[string]types.PriorityClass{
PriorityClass0: {Priority: 0, Preemptible: true},
PriorityClass1: {Priority: 1, Preemptible: true},
PriorityClass2: {Priority: 2, Preemptible: true},
PriorityClass3: {Priority: 3, Preemptible: false},
}

priority int32 = 1

containerObj = []v1.Container{
Expand All @@ -47,87 +25,8 @@ var (
},
},
}

expectedResourceRequirement = v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceName("cpu"): *resource.NewMilliQuantity(5300, resource.DecimalSI),
v1.ResourceName("memory"): *resource.NewQuantity(5*1024*1024*1024, resource.BinarySI),
},
Requests: v1.ResourceList{
v1.ResourceName("cpu"): *resource.NewMilliQuantity(300, resource.DecimalSI),
v1.ResourceName("memory"): *resource.NewQuantity(2*1024*1024*1024, resource.BinarySI),
},
}
expectedScheduler = &schedulerobjects.PodRequirements{
ResourceRequirements: expectedResourceRequirement,
PreemptionPolicy: string(v1.PreemptLowerPriority),
}
)

func TestPodRequirementsFromPodSpecPriorityByPriorityClassName(t *testing.T) {
tests := []struct {
name string
podspec v1.PodSpec
priorityByPriorityClassName map[string]types.PriorityClass
loggedError bool
priority int32
}{
{
name: "PriorityClassName not present in priorityByPriorityClassName map",
podspec: v1.PodSpec{
PriorityClassName: "priority-8",
Containers: containerObj,
},
priorityByPriorityClassName: priorityByPriorityClassName,
loggedError: true,
priority: 0,
},
{
name: "PriorityByPriorityClassName map is nil",
podspec: v1.PodSpec{
PriorityClassName: "priority-3",
Containers: containerObj,
},
priorityByPriorityClassName: nil,
loggedError: false,
priority: 0,
},
{
name: "Priority is set directly on podspec",
podspec: v1.PodSpec{
Priority: &priority,
Containers: containerObj,
},
priorityByPriorityClassName: priorityByPriorityClassName,
loggedError: false,
priority: priority,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r, w, _ := os.Pipe()
// Stderr from this function would be written to file w
logrus.SetOutput(w)
scheduler := PodRequirementsFromPodSpec(&test.podspec, test.priorityByPriorityClassName)
// Closing file, w
err := w.Close()
require.NoError(t, err)
// Reading from file
out, _ := io.ReadAll(r)
expectedScheduler.Priority = test.priority
assert.Equal(t, scheduler, expectedScheduler)
// if loggedError is true, bytes should be written to stderr,
// Otherwise, no byte is expected to be written to stderr
if test.loggedError {
assert.NotEqual(t, len(out), 0)
} else {
assert.Equal(t, len(out), 0)
}
})
}
}

func TestPodRequirementsFromPodSpecPreemptionPolicy(t *testing.T) {
preemptNever := v1.PreemptNever
tests := []struct {
Expand Down Expand Up @@ -157,55 +56,12 @@ func TestPodRequirementsFromPodSpecPreemptionPolicy(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scheduler := PodRequirementsFromPodSpec(&test.podspec, priorityByPriorityClassName)
scheduler := PodRequirementsFromPodSpec(&test.podspec)
assert.Equal(t, scheduler.PreemptionPolicy, string(test.preemptionpolicy))
})
}
}

func TestPriorityFromPodSpec(t *testing.T) {
tests := map[string]struct {
podSpec *v1.PodSpec
expectedPriority int32
expectedOk bool
}{
"nil podSpec": {
podSpec: nil,
expectedPriority: 0,
expectedOk: false,
},
"priority already set": {
podSpec: &v1.PodSpec{
Priority: pointer.Int32(1),
PriorityClassName: PriorityClass2,
},
expectedPriority: 1,
expectedOk: true,
},
"existing priorityClass": {
podSpec: &v1.PodSpec{
PriorityClassName: PriorityClass2,
},
expectedPriority: 2,
expectedOk: true,
},
"non-existing priorityClass": {
podSpec: &v1.PodSpec{
PriorityClassName: "does not exist",
},
expectedPriority: 0,
expectedOk: false,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
p, ok := PriorityFromPodSpec(tc.podSpec, priorityByPriorityClassName)
assert.Equal(t, tc.expectedPriority, p)
assert.Equal(t, tc.expectedOk, ok)
})
}
}

func TestK8sResourceListToMap(t *testing.T) {
result := K8sResourceListToMap(v1.ResourceList{
"one": resource.MustParse("1"),
Expand Down
1 change: 0 additions & 1 deletion internal/scheduler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
&schedulerobjects.PodRequirements{
Tolerations: tolerations,
Annotations: map[string]string{"runtime_gang_cardinality": "3"},
Priority: 1000,
},
),
}
Expand Down
1 change: 0 additions & 1 deletion internal/scheduler/database/job_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@ func TestFetchJobRunLeases(t *testing.T) {
Effect: v1.TaintEffectNoSchedule,
},
},
Priority: 1000,
},
),
},
Expand Down
Loading