Skip to content

Commit

Permalink
Scheduler: Add floating resource support (#3767)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertdavidsmith authored Jul 3, 2024
1 parent 01333d8 commit 91a346f
Show file tree
Hide file tree
Showing 33 changed files with 664 additions and 77 deletions.
28 changes: 28 additions & 0 deletions internal/common/maps/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ package maps

import "github.com/armadaproject/armada/internal/common/interfaces"

// FromSlice maps element e of slice into map entry keyFunc(e): valueFunc(e)
func FromSlice[S []E, E any, K comparable, V any](slice S, keyFunc func(E) K, valueFunc func(E) V) map[K]V {
rv := make(map[K]V, len(slice))
for _, elem := range slice {
rv[keyFunc(elem)] = valueFunc(elem)
}
return rv
}

// MapValues maps the values of m into valueFunc(v).
func MapValues[M ~map[K]VA, K comparable, VA any, VB any](m M, valueFunc func(VA) VB) map[K]VB {
rv := make(map[K]VB, len(m))
Expand Down Expand Up @@ -75,3 +84,22 @@ func Filter[M ~map[K]V, K comparable, V any](m M, predicate func(K, V) bool) M {
}
return rv
}

// RemoveInPlace removes elements that match keySelector
func RemoveInPlace[K comparable, V any](m map[K]V, keySelector func(K) bool) {
for k := range m {
if keySelector(k) {
delete(m, k)
}
}
}

func Keys[K comparable, V any](m map[K]V) []K {
i := 0
result := make([]K, len(m))
for k := range m {
result[i] = k
i++
}
return result
}
42 changes: 42 additions & 0 deletions internal/common/maps/maps_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
package maps

import (
"strconv"
"testing"

"github.com/stretchr/testify/assert"
"golang.org/x/exp/slices"
)

func TestFromSlice(t *testing.T) {
actual := FromSlice(
[]int{1, 2},
func(elem int) string { return strconv.Itoa(elem) },
func(elem int) float64 { return float64(elem) * 0.1 },
)
expected := map[string]float64{
"1": 0.1,
"2": 0.2,
}
assert.Equal(t, expected, actual)
}

func TestMapKeys(t *testing.T) {
m := map[string][]int{
"foo": {1, 2, 3},
Expand Down Expand Up @@ -176,3 +190,31 @@ func TestFilterKeys(t *testing.T) {
}
assert.Equal(t, expected, actual)
}

func TestRemoveInPlace(t *testing.T) {
m := map[int]string{
1: "one",
2: "two",
3: "three",
4: "four",
}
RemoveInPlace(m, func(i int) bool {
return i > 2
})
expected := map[int]string{
1: "one",
2: "two",
}
assert.Equal(t, expected, m)
}

func TestKeys(t *testing.T) {
m := map[int]string{
1: "one",
2: "two",
}
result := Keys(m)
slices.Sort(result)
expected := []int{1, 2}
assert.Equal(t, expected, result)
}
12 changes: 12 additions & 0 deletions internal/scheduler/adapters/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
k8sResource "k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/types"
Expand Down Expand Up @@ -62,3 +63,14 @@ func PriorityFromPodSpec(podSpec *v1.PodSpec, priorityClasses map[string]types.P
// Couldn't find anything
return 0, false
}

func K8sResourceListToMap(resources v1.ResourceList) map[string]k8sResource.Quantity {
if resources == nil {
return nil
}
result := make(map[string]k8sResource.Quantity, len(resources))
for k, v := range resources {
result[string(k)] = v
}
return result
}
17 changes: 17 additions & 0 deletions internal/scheduler/adapters/adapters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,20 @@ func TestPriorityFromPodSpec(t *testing.T) {
})
}
}

func TestK8sResourceListToMap(t *testing.T) {
result := K8sResourceListToMap(v1.ResourceList{
"one": resource.MustParse("1"),
"two": resource.MustParse("2"),
})
expected := map[string]resource.Quantity{
"one": resource.MustParse("1"),
"two": resource.MustParse("2"),
}

assert.Equal(t, expected, result)
}

func TestK8sResourceListToMap_PreservesNil(t *testing.T) {
assert.Nil(t, K8sResourceListToMap(nil))
}
33 changes: 32 additions & 1 deletion internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/compress"
"github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/maps"
"github.com/armadaproject/armada/internal/common/pulsarutils"
"github.com/armadaproject/armada/internal/common/slices"
priorityTypes "github.com/armadaproject/armada/internal/common/types"
Expand All @@ -40,6 +41,9 @@ type ExecutorApi struct {
allowedPriorities []int32
// Known priority classes
priorityClasses map[string]priorityTypes.PriorityClass
// Allowed resource names - resource requests/limits not on this list are dropped.
// This is needed to ensure floating resources are not passed to k8s.
allowedResources map[string]bool
// Max number of events in published Pulsar messages
maxEventsPerPulsarMessage int
// Max size of Pulsar messages produced.
Expand All @@ -55,6 +59,7 @@ func NewExecutorApi(producer pulsar.Producer,
jobRepository database.JobRepository,
executorRepository database.ExecutorRepository,
allowedPriorities []int32,
allowedResources []string,
nodeIdLabel string,
priorityClassNameOverride *string,
priorityClasses map[string]priorityTypes.PriorityClass,
Expand All @@ -69,6 +74,7 @@ func NewExecutorApi(producer pulsar.Producer,
jobRepository: jobRepository,
executorRepository: executorRepository,
allowedPriorities: allowedPriorities,
allowedResources: maps.FromSlice(allowedResources, func(name string) string { return name }, func(name string) bool { return true }),
maxEventsPerPulsarMessage: maxEventsPerPulsarMessage,
maxPulsarMessageSizeBytes: maxPulsarMessageSizeBytes,
nodeIdLabel: nodeIdLabel,
Expand Down Expand Up @@ -109,7 +115,7 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
return err
}
ctx.Infof(
"executor currently has %d job runs; sending %d cancellations and %d new runs",
"Executor currently has %d job runs; sending %d cancellations and %d new runs",
len(requestRuns), len(runsToCancel), len(newRuns),
)

Expand Down Expand Up @@ -149,6 +155,8 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns

srv.addPreemptibleLabel(submitMsg)

srv.dropDisallowedResources(submitMsg.MainObject.GetPodSpec().PodSpec)

// This must happen after anything that relies on the priorityClassName
if srv.priorityClassNameOverride != nil {
srv.setPriorityClassName(submitMsg, *srv.priorityClassNameOverride)
Expand Down Expand Up @@ -216,6 +224,29 @@ func (srv *ExecutorApi) addPreemptibleLabel(job *armadaevents.SubmitJob) {
addLabels(job, labels)
}

// Drop non-supported resources. This is needed to ensure floating resources
// are not passed to k8s.
func (srv *ExecutorApi) dropDisallowedResources(pod *v1.PodSpec) {
if pod == nil {
return
}
srv.dropDisallowedResourcesFromContainers(pod.InitContainers)
srv.dropDisallowedResourcesFromContainers(pod.Containers)
}

func (srv *ExecutorApi) dropDisallowedResourcesFromContainers(containers []v1.Container) {
for _, container := range containers {
removeDisallowedKeys(container.Resources.Limits, srv.allowedResources)
removeDisallowedKeys(container.Resources.Requests, srv.allowedResources)
}
}

func removeDisallowedKeys(rl v1.ResourceList, allowedKeys map[string]bool) {
maps.RemoveInPlace(rl, func(name v1.ResourceName) bool {
return !allowedKeys[string(name)]
})
}

func (srv *ExecutorApi) isPreemptible(job *armadaevents.SubmitJob) bool {
priorityClassName := ""

Expand Down
9 changes: 9 additions & 0 deletions internal/scheduler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
"github.com/armadaproject/armada/internal/common/mocks"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/common/pulsarutils"
"github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/common/types"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/database"
schedulermocks "github.com/armadaproject/armada/internal/scheduler/mocks"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
"github.com/armadaproject/armada/pkg/executorapi"
Expand Down Expand Up @@ -321,6 +324,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
mockJobRepository,
mockExecutorRepository,
[]int32{1000, 2000},
testResourceNames(),
"kubernetes.io/hostname",
nil,
priorityClasses,
Expand Down Expand Up @@ -448,6 +452,7 @@ func TestExecutorApi_Publish(t *testing.T) {
mockJobRepository,
mockExecutorRepository,
[]int32{1000, 2000},
testResourceNames(),
"kubernetes.io/hostname",
nil,
priorityClasses,
Expand Down Expand Up @@ -495,3 +500,7 @@ func groups(t *testing.T) ([]string, []byte) {
require.NoError(t, err)
return groups, compressed
}

func testResourceNames() []string {
return slices.Map(testfixtures.GetTestSupportedResourceTypes(), func(rt schedulerconfig.ResourceType) string { return rt.Name })
}
20 changes: 20 additions & 0 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ type LeaderConfig struct {
LeaderConnection client.ApiConnectionDetails
}

type FloatingResourceConfig struct {
// Resource name, e.g. "s3-connections"
Name string
// Per-pool config.
Pools []FloatingResourcePoolConfig
}

type FloatingResourcePoolConfig struct {
// Name of the pool.
Name string
// Amount of this resource that can be allocated across all jobs in this pool.
Quantity resource.Quantity
}
type HttpConfig struct {
Port int `validate:"required"`
}
Expand Down Expand Up @@ -230,6 +243,13 @@ type SchedulingConfig struct {
//
// If not set, all taints are indexed.
IndexedTaints []string
// Experimental - subject to change
// Resources that are outside of k8s, and not tied to a given k8s node or cluster.
// For example connections to an S3 server that sits outside of k8s could be rationed to limit load on the server.
// These can be requested like a normal k8s resource. Note there is no mechanism in armada
// to enforce actual usage, it relies on honesty. For example, there is nothing to stop a badly-behaved job
// requesting 2 S3 server connections and then opening 10.
ExperimentalFloatingResources []FloatingResourceConfig
// WellKnownNodeTypes defines a set of well-known node types used to define "home" and "away" nodes for a given priority class.
WellKnownNodeTypes []WellKnownNodeType `validate:"dive"`
// Executor that haven't heartbeated in this time period are considered stale.
Expand Down
Loading

0 comments on commit 91a346f

Please sign in to comment.