Skip to content

Commit

Permalink
Calculate assignments for a workload
Browse files Browse the repository at this point in the history
Change-Id: I150abfd1b54b32a38561c9ee0915cf0bf41a9bc5
  • Loading branch information
alculquicondor committed Feb 17, 2022
1 parent d181dee commit 8467cdc
Show file tree
Hide file tree
Showing 6 changed files with 573 additions and 29 deletions.
36 changes: 11 additions & 25 deletions pkg/capacity/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

kueue "gke-internal.googlesource.com/gke-batch/kueue/api/v1alpha1"
utiltesting "gke-internal.googlesource.com/gke-batch/kueue/pkg/util/testing"
"gke-internal.googlesource.com/gke-batch/kueue/pkg/workload"
)

Expand Down Expand Up @@ -122,7 +123,7 @@ func TestSnapshot(t *testing.T) {
Pods: []kueue.PodSet{
{
Count: 5,
Spec: podSpecForRequest(map[corev1.ResourceName]string{
Spec: utiltesting.PodSpecForRequest(map[corev1.ResourceName]string{
corev1.ResourceCPU: "2",
}),
AssignedTypes: map[corev1.ResourceName]string{
Expand All @@ -139,7 +140,7 @@ func TestSnapshot(t *testing.T) {
Pods: []kueue.PodSet{
{
Count: 5,
Spec: podSpecForRequest(map[corev1.ResourceName]string{
Spec: utiltesting.PodSpecForRequest(map[corev1.ResourceName]string{
corev1.ResourceCPU: "1",
"example.com/gpu": "2",
}),
Expand All @@ -158,7 +159,7 @@ func TestSnapshot(t *testing.T) {
Pods: []kueue.PodSet{
{
Count: 5,
Spec: podSpecForRequest(map[corev1.ResourceName]string{
Spec: utiltesting.PodSpecForRequest(map[corev1.ResourceName]string{
corev1.ResourceCPU: "1",
"example.com/gpu": "1",
}),
Expand Down Expand Up @@ -201,8 +202,8 @@ func TestSnapshot(t *testing.T) {
wantSnapshot := Snapshot{
Capacities: map[string]*Capacity{
"foofoo": {
Name: "foofoo",
Cohort: &wantCohorts[0],
Name: "foofoo",
Cohort: &wantCohorts[0],
RequestableResources: map[corev1.ResourceName][]kueue.ResourceType{
corev1.ResourceCPU: {
{
Expand Down Expand Up @@ -230,8 +231,8 @@ func TestSnapshot(t *testing.T) {
},
},
"foobar": {
Name: "foobar",
Cohort: &wantCohorts[0],
Name: "foobar",
Cohort: &wantCohorts[0],
RequestableResources: map[corev1.ResourceName][]kueue.ResourceType{
corev1.ResourceCPU: {
{
Expand Down Expand Up @@ -263,7 +264,7 @@ func TestSnapshot(t *testing.T) {
},
},
"bar": {
Name: "bar",
Name: "bar",
RequestableResources: map[corev1.ResourceName][]kueue.ResourceType{
corev1.ResourceCPU: {
{
Expand All @@ -276,6 +277,7 @@ func TestSnapshot(t *testing.T) {
UsedResources: Resources{
corev1.ResourceCPU: map[string]int64{"": 0},
},
Workloads: map[string]*workload.Info{},
},
},
}
Expand All @@ -287,23 +289,7 @@ func TestSnapshot(t *testing.T) {
m.Cohort = &wantCohorts[i]
}
}
if diff := cmp.Diff(wantSnapshot, snapshot, cmpopts.IgnoreUnexported(Cohort{}), cmpopts.IgnoreFields(Capacity{}, "Workloads")); diff != "" {
if diff := cmp.Diff(wantSnapshot, snapshot, cmpopts.IgnoreUnexported(Cohort{})); diff != "" {
t.Errorf("Unexpected Snapshot (-want,+got):\n%s", diff)
}
}

func podSpecForRequest(request map[corev1.ResourceName]string) corev1.PodSpec {
rl := make(corev1.ResourceList, len(request))
for name, val := range request {
rl[name] = resource.MustParse(val)
}
return corev1.PodSpec{
Containers: []corev1.Container{
{
Resources: corev1.ResourceRequirements{
Requests: rl,
},
},
},
}
}
4 changes: 2 additions & 2 deletions pkg/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ func (m *Manager) CleanUpOnContext(ctx context.Context) {
m.cond.Broadcast()
}

// Heads returns the heads of the queues, blocking if they are empty until
// they have elements or the context terminates.
// Heads returns the heads of the queues, along with their associated capacity.
// It blocks if the queues empty until they have elements or the context terminates.
func (m *Manager) Heads(ctx context.Context) []workload.Info {
m.Lock()
defer m.Unlock()
Expand Down
134 changes: 132 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ import (
"context"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"

kueue "gke-internal.googlesource.com/gke-batch/kueue/api/v1alpha1"
"gke-internal.googlesource.com/gke-batch/kueue/pkg/capacity"
"gke-internal.googlesource.com/gke-batch/kueue/pkg/queue"
"k8s.io/apimachinery/pkg/util/wait"
"gke-internal.googlesource.com/gke-batch/kueue/pkg/workload"
)

type Scheduler struct {
Expand All @@ -45,9 +49,135 @@ func (s *Scheduler) Start(ctx context.Context) {
}

func (s *Scheduler) schedule(ctx context.Context) {
// 1. Get the heads from the queues, including their desired capacity.
// This operation blocks while the queues are empty.
headWorkloads := s.queues.Heads(ctx)
// No elements means the program is finishing.
if len(headWorkloads) == 0 {
return
}
// schedule

// 2. Take a snapshot of the capacities and their usage.
snapshot := s.capacityCache.Snapshot()

// 3. Calculate requirements for assigning workloads to capacities
// (resource types, borrowing).
_ = calculateRequirementsForAssignments(ctx, headWorkloads, snapshot)
// TODO: schedule
}

// entry holds requirements for a workload to be scheduled in a capacity.
type entry struct {
// workload.Info holds the workload from the API as well as resource usage
// and types assigned.
workload.Info
// borrows is the resouces that the workload would need to borrow from the
// cohort if it was scheduled in the capacity.
borrows capacity.Resources
}

// calculateRequirementsForAssignments returns the workloads with their
// requirements (resource types, borrowing) if they were assigned to the
// capacities in the snapshot.
func calculateRequirementsForAssignments(ctx context.Context, workloads []workload.Info, snap capacity.Snapshot) []entry {
log := logr.FromContext(ctx)
entries := make([]entry, 0, len(workloads))
for _, w := range workloads {
log := log.WithValues("queuedWorkload", klog.KObj(w.Obj), "capacity", w.Capacity)
cap := snap.Capacities[w.Capacity]
if cap == nil {
log.V(3).Info("Capacity not found when calculating workload assingments")
continue
}
e := entry{Info: w}
if !e.assignTypes(cap) {
log.V(2).Info("Workload didn't fit in remaining capacity even when borrowing")
}
entries = append(entries, e)
}
return entries
}

// assignTypes calculates the types that should be assigned to this entry
// if scheduled to this capacity, including details of how much it needs to
// borrow from the cohort.
// It returns whether the entry would fit. If it doesn't fit, the object is
// unmodified.
func (e *entry) assignTypes(cap *capacity.Capacity) bool {
typedRequests := make(map[string]workload.Resources, len(e.TotalRequests))
wUsed := make(capacity.Resources)
wBorrows := make(capacity.Resources)
for psName, podSet := range e.TotalRequests {
types := make(map[corev1.ResourceName]string, len(podSet.Requests))
for resName, reqVal := range podSet.Requests {
rType, borrow := findTypeForResource(resName, reqVal, cap, wUsed[resName])
if rType == "" {
return false
}
if borrow > 0 {
if wBorrows[resName] == nil {
wBorrows[resName] = make(map[string]int64)
}
// Don't accumulate borrowing. The returned `borrow` already considers
// usage from previous pod sets.
wBorrows[resName][rType] = borrow
}
if wUsed[resName] == nil {
wUsed[resName] = make(map[string]int64)
}
wUsed[resName][rType] += reqVal
types[resName] = rType
}
typedRequests[psName] = workload.Resources{
Requests: podSet.Requests,
Types: types,
}
}
e.TotalRequests = typedRequests
if len(wBorrows) > 0 {
e.borrows = wBorrows
}
return true
}

// findTypeForResources returns a type which can satisfy the resource request,
// given that wUsed is the usage of types by previous podsets.
// If it finds a type, also returns any borrowing required.
func findTypeForResource(name corev1.ResourceName, val int64, cap *capacity.Capacity, wUsed map[string]int64) (string, int64) {
for _, rType := range cap.RequestableResources[name] {
// Consider the usage assigned to previous pod sets.
ok, borrow := canAssignType(name, val+wUsed[rType.Name], cap, &rType)
if ok {
return rType.Name, borrow
}
}
return "", 0
}

// canAssignType returns whether a requested resource fits in a specific type.
// If it fits, also returns any borrowing required.
func canAssignType(name corev1.ResourceName, val int64, cap *capacity.Capacity, rType *kueue.ResourceType) (bool, int64) {
ceiling := workload.ResourceValue(name, rType.Quota.Ceiling)
used := cap.UsedResources[name][rType.Name]
if used+val > ceiling {
// Past borrowing limit.
return false, 0
}
guaranteed := workload.ResourceValue(name, rType.Quota.Guaranteed)
cohortUsed := used
cohortTotal := guaranteed
if cap.Cohort != nil {
cohortUsed = cap.Cohort.UsedResources[name][rType.Name]
cohortTotal = cap.Cohort.RequestableResources[name][rType.Name]
}
borrow := used + val - guaranteed
if borrow < 0 {
borrow = 0
}
if cohortUsed+val > cohortTotal {
// Doesn't fit even with borrowing.
// TODO(PostMVP): preemption could help if borrow == 0
return false, 0
}
return true, borrow
}
Loading

0 comments on commit 8467cdc

Please sign in to comment.