Skip to content

Commit

Permalink
Scheduler: fall back correctly when per-queue resource limits are pa…
Browse files Browse the repository at this point in the history
…rtially specified (#3660)
  • Loading branch information
robertdavidsmith authored Jun 7, 2024
1 parent 5602ad0 commit b7b40d9
Show file tree
Hide file tree
Showing 15 changed files with 295 additions and 236 deletions.
22 changes: 5 additions & 17 deletions internal/common/util/map.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package util

func MergeMaps(a map[string]string, b map[string]string) map[string]string {
result := make(map[string]string)
func MergeMaps[K comparable, V any](a map[K]V, b map[K]V) map[K]V {
result := make(map[K]V)
for k, v := range a {
result[k] = v
}
Expand All @@ -11,19 +11,7 @@ func MergeMaps(a map[string]string, b map[string]string) map[string]string {
return result
}

func DeepCopy(a map[string]string) map[string]string {
if a == nil {
return nil
}

result := make(map[string]string)
for k, v := range a {
result[k] = v
}
return result
}

func Equal(a map[string]string, b map[string]string) bool {
func Equal[K comparable, V comparable](a map[K]V, b map[K]V) bool {
if len(a) != len(b) {
return false
}
Expand All @@ -36,12 +24,12 @@ func Equal(a map[string]string, b map[string]string) bool {
return true
}

func FilterKeys(a map[string]string, keys []string) map[string]string {
func FilterKeys[K comparable, V any](a map[K]V, keys []K) map[K]V {
if a == nil {
return nil
}

result := make(map[string]string)
result := make(map[K]V)
for _, key := range keys {
if val, exists := a[key]; exists {
result[key] = val
Expand Down
17 changes: 9 additions & 8 deletions internal/common/util/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"golang.org/x/exp/maps"
)

func TestMergeMaps_AllValuesPresent(t *testing.T) {
Expand Down Expand Up @@ -51,25 +52,25 @@ func TestMergeMaps_Nil(t *testing.T) {
}
assert.Equal(t, MergeMaps(map1, nil), map1)
assert.Equal(t, MergeMaps(nil, map1), map1)
assert.Equal(t, MergeMaps(nil, nil), map[string]string{})
assert.Equal(t, MergeMaps[string, string](nil, nil), map[string]string{})
}

func TestEqual(t *testing.T) {
map1 := map[string]string{
"a": "value1",
}
map2 := DeepCopy(map1)
map2 := maps.Clone(map1)
assert.True(t, Equal(map1, map2))

map3 := DeepCopy(map1)
map3 := maps.Clone(map1)
map3["a"] = "value2"
assert.False(t, Equal(map1, map3))

map4 := DeepCopy(map1)
map4 := maps.Clone(map1)
delete(map4, "a")
assert.False(t, Equal(map1, map4))

map5 := DeepCopy(map1)
map5 := maps.Clone(map1)
map5["b"] = "value2"
assert.False(t, Equal(map1, map5))
}
Expand All @@ -80,7 +81,7 @@ func TestEqual_Nil(t *testing.T) {
}
assert.False(t, Equal(map1, nil))
assert.False(t, Equal(nil, map1))
assert.True(t, Equal(nil, nil))
assert.True(t, Equal[string, string](nil, nil))
}

func Test_FilterKeys(t *testing.T) {
Expand All @@ -94,7 +95,7 @@ func Test_FilterKeys(t *testing.T) {
}

func Test_FilterKeys_Nil(t *testing.T) {
assert.Nil(t, FilterKeys(nil, nil))
assert.Nil(t, FilterKeys(nil, []string{}))
assert.Nil(t, FilterKeys[string, string](nil, nil))
assert.Nil(t, FilterKeys[string, string](nil, []string{}))
assert.Equal(t, map[string]string{}, FilterKeys(map[string]string{"a": "b"}, nil))
}
4 changes: 2 additions & 2 deletions internal/executor/fake/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
networking "k8s.io/api/networking/v1"
Expand All @@ -24,7 +25,6 @@ import (

"github.com/armadaproject/armada/internal/common/armadacontext"
armadaresource "github.com/armadaproject/armada/internal/common/resource"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/executor/configuration"
cluster_context "github.com/armadaproject/armada/internal/executor/context"
)
Expand Down Expand Up @@ -303,7 +303,7 @@ func (c *FakeClusterContext) addNodes(specs []*NodeSpec) {
for _, s := range specs {
for i := 0; i < s.Count; i++ {
name := c.clusterId + "-" + s.Name + "-" + strconv.Itoa(i)
labels := util.DeepCopy(s.Labels)
labels := maps.Clone(s.Labels)
if labels == nil {
labels = map[string]string{}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/executor/util/ingress_service_config.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package util

import (
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"

"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/pkg/api"
)

Expand Down Expand Up @@ -32,7 +32,7 @@ func deepCopy(config *IngressServiceConfig) *IngressServiceConfig {
return &IngressServiceConfig{
Type: config.Type,
Ports: slices.Clone(config.Ports),
Annotations: util.DeepCopy(config.Annotations),
Annotations: maps.Clone(config.Annotations),
TlsEnabled: config.TlsEnabled,
CertName: config.CertName,
UseClusterIp: config.UseClusterIp,
Expand All @@ -48,7 +48,7 @@ func CombineIngressService(ingresses []*api.IngressConfig, services []*api.Servi
&IngressServiceConfig{
Type: Ingress,
Ports: slices.Clone(ing.Ports),
Annotations: util.DeepCopy(ing.Annotations),
Annotations: maps.Clone(ing.Annotations),
TlsEnabled: ing.TlsEnabled,
CertName: ing.CertName,
UseClusterIp: ing.UseClusterIP,
Expand Down
126 changes: 76 additions & 50 deletions internal/scheduler/constraints/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/scheduler/configuration"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
Expand All @@ -33,8 +34,7 @@ const (
GangExceedsGlobalBurstSizeUnschedulableReason = "gang cardinality too large: exceeds global max burst size"
GangExceedsQueueBurstSizeUnschedulableReason = "gang cardinality too large: exceeds queue max burst size"

UnschedulableReasonMaximumResourcesPerQueueExceeded = "per-queue resource limit exceeded"
UnschedulableReasonMaximumResourcesExceeded = "resource limit exceeded"
UnschedulableReasonMaximumResourcesExceeded = "resource limit exceeded"
)

// IsTerminalUnschedulableReason returns true if reason indicates
Expand All @@ -58,31 +58,31 @@ func IsTerminalQueueUnschedulableReason(reason string) bool {
// SchedulingConstraints contains scheduling constraints, e.g., per-queue resource limits.
type SchedulingConstraints struct {
// Max number of jobs to consider for a queue before giving up.
MaxQueueLookback uint
maxQueueLookBack uint
// Jobs leased to this executor must be at least this large.
// Used, e.g., to avoid scheduling CPU-only jobs onto clusters with GPUs.
MinimumJobSize schedulerobjects.ResourceList
minimumJobSize map[string]resource.Quantity
// Scheduling constraints by priority class.
PriorityClassSchedulingConstraintsByPriorityClassName map[string]PriorityClassSchedulingConstraints
priorityClassSchedulingConstraintsByPriorityClassName map[string]priorityClassSchedulingConstraints
// Scheduling constraints for specific queues.
// If present for a particular queue, global limits (i.e., PriorityClassSchedulingConstraintsByPriorityClassName)
// If present for a particular queue, global limits (i.e., priorityClassSchedulingConstraintsByPriorityClassName)
// do not apply for that queue.
QueueSchedulingConstraintsByQueueName map[string]QueueSchedulingConstraints
queueSchedulingConstraintsByQueueName map[string]queueSchedulingConstraints
// Limits total resources scheduled per invocation.
MaximumResourcesToSchedule schedulerobjects.ResourceList
maximumResourcesToSchedule map[string]resource.Quantity
}

// QueueSchedulingConstraints contains per-queue scheduling constraints.
type QueueSchedulingConstraints struct {
// queueSchedulingConstraints contains per-queue scheduling constraints.
type queueSchedulingConstraints struct {
// Scheduling constraints by priority class.
PriorityClassSchedulingConstraintsByPriorityClassName map[string]PriorityClassSchedulingConstraints
PriorityClassSchedulingConstraintsByPriorityClassName map[string]priorityClassSchedulingConstraints
}

// PriorityClassSchedulingConstraints contains scheduling constraints that apply to jobs of a specific priority class.
type PriorityClassSchedulingConstraints struct {
// priorityClassSchedulingConstraints contains scheduling constraints that apply to jobs of a specific priority class.
type priorityClassSchedulingConstraints struct {
PriorityClassName string
// Limits total resources allocated to jobs of this priority class per queue.
MaximumResourcesPerQueue schedulerobjects.ResourceList
MaximumResourcesPerQueue map[string]resource.Quantity
}

func NewSchedulingConstraints(
Expand All @@ -92,35 +92,35 @@ func NewSchedulingConstraints(
config configuration.SchedulingConfig,
queues []*api.Queue,
) SchedulingConstraints {
priorityClassSchedulingConstraintsByPriorityClassName := make(map[string]PriorityClassSchedulingConstraints, len(config.PriorityClasses))
priorityClassSchedulingConstraintsByPriorityClassName := make(map[string]priorityClassSchedulingConstraints, len(config.PriorityClasses))
for name, priorityClass := range config.PriorityClasses {
maximumResourceFractionPerQueue := priorityClass.MaximumResourceFractionPerQueue
if m, ok := priorityClass.MaximumResourceFractionPerQueueByPool[pool]; ok {
// Use pool-specific config is available.
maximumResourceFractionPerQueue = m
maximumResourceFractionPerQueue = util.MergeMaps(maximumResourceFractionPerQueue, m)
}
priorityClassSchedulingConstraintsByPriorityClassName[name] = PriorityClassSchedulingConstraints{
priorityClassSchedulingConstraintsByPriorityClassName[name] = priorityClassSchedulingConstraints{
PriorityClassName: name,
MaximumResourcesPerQueue: absoluteFromRelativeLimits(totalResources, maximumResourceFractionPerQueue),
MaximumResourcesPerQueue: absoluteFromRelativeLimits(totalResources.Resources, maximumResourceFractionPerQueue),
}
}

queueSchedulingConstraintsByQueueName := make(map[string]QueueSchedulingConstraints, len(queues))
queueSchedulingConstraintsByQueueName := make(map[string]queueSchedulingConstraints, len(queues))
for _, queue := range queues {
priorityClassSchedulingConstraintsByPriorityClassNameForQueue := make(map[string]PriorityClassSchedulingConstraints, len(queue.ResourceLimitsByPriorityClassName))
priorityClassSchedulingConstraintsByPriorityClassNameForQueue := make(map[string]priorityClassSchedulingConstraints, len(queue.ResourceLimitsByPriorityClassName))
for priorityClassName, priorityClassResourceLimits := range queue.ResourceLimitsByPriorityClassName {
maximumResourceFraction := priorityClassResourceLimits.MaximumResourceFraction
if m, ok := priorityClassResourceLimits.MaximumResourceFractionByPool[pool]; ok {
// Use pool-specific maximum resource fraction if available.
maximumResourceFraction = m.MaximumResourceFraction
maximumResourceFraction = util.MergeMaps(maximumResourceFraction, m.MaximumResourceFraction)
}
priorityClassSchedulingConstraintsByPriorityClassNameForQueue[priorityClassName] = PriorityClassSchedulingConstraints{
priorityClassSchedulingConstraintsByPriorityClassNameForQueue[priorityClassName] = priorityClassSchedulingConstraints{
PriorityClassName: priorityClassName,
MaximumResourcesPerQueue: absoluteFromRelativeLimits(totalResources, maximumResourceFraction),
MaximumResourcesPerQueue: absoluteFromRelativeLimits(totalResources.Resources, maximumResourceFraction),
}
}
if len(priorityClassSchedulingConstraintsByPriorityClassNameForQueue) > 0 {
queueSchedulingConstraintsByQueueName[queue.Name] = QueueSchedulingConstraints{
queueSchedulingConstraintsByQueueName[queue.Name] = queueSchedulingConstraints{
PriorityClassSchedulingConstraintsByPriorityClassName: priorityClassSchedulingConstraintsByPriorityClassNameForQueue,
}
}
Expand All @@ -132,18 +132,18 @@ func NewSchedulingConstraints(
maximumResourceFractionToSchedule = m
}
return SchedulingConstraints{
MaxQueueLookback: config.MaxQueueLookback,
MinimumJobSize: minimumJobSize,
MaximumResourcesToSchedule: absoluteFromRelativeLimits(totalResources, maximumResourceFractionToSchedule),
PriorityClassSchedulingConstraintsByPriorityClassName: priorityClassSchedulingConstraintsByPriorityClassName,
QueueSchedulingConstraintsByQueueName: queueSchedulingConstraintsByQueueName,
maxQueueLookBack: config.MaxQueueLookback,
minimumJobSize: minimumJobSize.Resources,
maximumResourcesToSchedule: absoluteFromRelativeLimits(totalResources.Resources, maximumResourceFractionToSchedule),
priorityClassSchedulingConstraintsByPriorityClassName: priorityClassSchedulingConstraintsByPriorityClassName,
queueSchedulingConstraintsByQueueName: queueSchedulingConstraintsByQueueName,
}
}

func absoluteFromRelativeLimits(totalResources schedulerobjects.ResourceList, relativeLimits map[string]float64) schedulerobjects.ResourceList {
absoluteLimits := schedulerobjects.NewResourceList(len(relativeLimits))
func absoluteFromRelativeLimits(totalResources map[string]resource.Quantity, relativeLimits map[string]float64) map[string]resource.Quantity {
absoluteLimits := make(map[string]resource.Quantity, len(relativeLimits))
for t, f := range relativeLimits {
absoluteLimits.Set(t, ScaleQuantity(totalResources.Get(t).DeepCopy(), f))
absoluteLimits[t] = ScaleQuantity(totalResources[t].DeepCopy(), f)
}
return absoluteLimits
}
Expand All @@ -157,8 +157,8 @@ func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity {
}

func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext, queue string) (bool, string, error) {
// MaximumResourcesToSchedule check.
if !sctx.ScheduledResources.IsStrictlyLessOrEqual(constraints.MaximumResourcesToSchedule) {
// maximumResourcesToSchedule check.
if !isStrictlyLessOrEqual(sctx.ScheduledResources.Resources, constraints.maximumResourcesToSchedule) {
return false, MaximumResourcesScheduledUnschedulableReason, nil
}
return true, "", nil
Expand All @@ -174,7 +174,7 @@ func (constraints *SchedulingConstraints) CheckConstraints(
}

// Check that the job is large enough for this executor.
if ok, unschedulableReason := RequestsAreLargeEnough(gctx.TotalResourceRequests, constraints.MinimumJobSize); !ok {
if ok, unschedulableReason := RequestsAreLargeEnough(gctx.TotalResourceRequests.Resources, constraints.minimumJobSize); !ok {
return false, unschedulableReason, nil
}

Expand Down Expand Up @@ -202,30 +202,56 @@ func (constraints *SchedulingConstraints) CheckConstraints(
return false, QueueRateLimitExceededByGangUnschedulableReason, nil
}

// QueueSchedulingConstraintsByQueueName / PriorityClassSchedulingConstraintsByPriorityClassName checks.
if queueConstraint, ok := constraints.QueueSchedulingConstraintsByQueueName[gctx.Queue]; ok {
// queueSchedulingConstraintsByQueueName / priorityClassSchedulingConstraintsByPriorityClassName checks.
queueAndPriorityClassResourceLimits := constraints.getQueueAndPriorityClassResourceLimits(gctx)
priorityClassResourceLimits := constraints.getPriorityClassResourceLimits(gctx)
overallResourceLimits := util.MergeMaps(priorityClassResourceLimits, queueAndPriorityClassResourceLimits)
if !isStrictlyLessOrEqual(qctx.AllocatedByPriorityClass[gctx.PriorityClassName].Resources, overallResourceLimits) {
return false, UnschedulableReasonMaximumResourcesExceeded, nil
}

return true, "", nil
}

func (constraints *SchedulingConstraints) getQueueAndPriorityClassResourceLimits(gctx *schedulercontext.GangSchedulingContext) map[string]resource.Quantity {
if queueConstraint, ok := constraints.queueSchedulingConstraintsByQueueName[gctx.Queue]; ok {
if priorityClassConstraint, ok := queueConstraint.PriorityClassSchedulingConstraintsByPriorityClassName[gctx.PriorityClassName]; ok {
if !qctx.AllocatedByPriorityClass[gctx.PriorityClassName].IsStrictlyLessOrEqual(priorityClassConstraint.MaximumResourcesPerQueue) {
return false, UnschedulableReasonMaximumResourcesPerQueueExceeded, nil
}
}
} else {
if priorityClassConstraint, ok := constraints.PriorityClassSchedulingConstraintsByPriorityClassName[gctx.PriorityClassName]; ok {
if !qctx.AllocatedByPriorityClass[gctx.PriorityClassName].IsStrictlyLessOrEqual(priorityClassConstraint.MaximumResourcesPerQueue) {
return false, UnschedulableReasonMaximumResourcesExceeded, nil
}
return priorityClassConstraint.MaximumResourcesPerQueue
}
}
return map[string]resource.Quantity{}
}

return true, "", nil
func (constraints *SchedulingConstraints) getPriorityClassResourceLimits(gctx *schedulercontext.GangSchedulingContext) map[string]resource.Quantity {
if priorityClassConstraint, ok := constraints.priorityClassSchedulingConstraintsByPriorityClassName[gctx.PriorityClassName]; ok {
return priorityClassConstraint.MaximumResourcesPerQueue
}
return map[string]resource.Quantity{}
}

func RequestsAreLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) {
for t, minQuantity := range minRequest.Resources {
q := totalResourceRequests.Get(t)
func RequestsAreLargeEnough(totalResourceRequests, minRequest map[string]resource.Quantity) (bool, string) {
for t, minQuantity := range minRequest {
q := totalResourceRequests[t]
if minQuantity.Cmp(q) == 1 {
return false, fmt.Sprintf("job requests %s %s, but the minimum is %s", q.String(), t, minQuantity.String())
}
}
return true, ""
}

func (constraints *SchedulingConstraints) GetMaxQueueLookBack() uint {
return constraints.maxQueueLookBack
}

// isStrictlyLessOrEqual returns false if
// - there is a quantity in b greater than that in a or
// - there is a non-zero quantity in b not in a
// and true otherwise.
func isStrictlyLessOrEqual(a map[string]resource.Quantity, b map[string]resource.Quantity) bool {
for t, q := range b {
if q.Cmp(a[t]) == -1 {
return false
}
}
return true
}
Loading

0 comments on commit b7b40d9

Please sign in to comment.