Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
TAS: introduce scheduled-level tests
Browse files Browse the repository at this point in the history
mimowo committed Oct 29, 2024
1 parent 90ef607 commit 39bb53e
Showing 5 changed files with 414 additions and 75 deletions.
66 changes: 0 additions & 66 deletions pkg/cache/tas_cache_test.go
Original file line number Diff line number Diff line change
@@ -27,14 +27,10 @@ import (
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/resources"
"sigs.k8s.io/kueue/pkg/util/limitrange"
utiltas "sigs.k8s.io/kueue/pkg/util/tas"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod"
"sigs.k8s.io/kueue/pkg/workload"
)

func TestFindTopologyAssignment(t *testing.T) {
@@ -805,51 +801,6 @@ func TestFindTopologyAssignment(t *testing.T) {
},
},
},
"don't double-count TAS pods when computing the capacity": {
nodes: []corev1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "x1",
Labels: map[string]string{
tasHostLabel: "x1",
},
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
},
},
},
pods: []corev1.Pod{
*testingpod.MakePod("test-tas", "test-ns").NodeName("x1").
Request(corev1.ResourceCPU, "400m").
NodeSelector(tasHostLabel, "x1").
Label(kueuealpha.TASLabel, "true").
StatusPhase(corev1.PodRunning).
Obj(),
},
request: kueue.PodSetTopologyRequest{
Required: ptr.To(tasHostLabel),
},
levels: defaultOneLevel,
requests: resources.Requests{
corev1.ResourceCPU: 600,
},
count: 1,
wantAssignment: &kueue.TopologyAssignment{
Levels: defaultOneLevel,
Domains: []kueue.TopologyDomainAssignment{
{
Count: 1,
Values: []string{
"x1",
},
},
},
},
},
"include usage from pending scheduled non-TAS pods, blocked assignment": {
// there is not enough free capacity on the only node x1
nodes: []corev1.Node{
@@ -995,23 +946,6 @@ func TestFindTopologyAssignment(t *testing.T) {
tasCache := NewTASCache(client)
tasFlavorCache := tasCache.NewTASFlavorCache(tc.levels, tc.nodeLabels)

// account for usage from TAS pods for the need of the unit tests.
// note that in practice the usage is accounted based on TAS
// Workloads.
for _, pod := range tc.pods {
if _, ok := pod.Labels[kueuealpha.TASLabel]; ok {
levelValues := utiltas.LevelValues(tc.levels, pod.Spec.NodeSelector)
requests := limitrange.TotalRequests(&pod.Spec)
usage := resources.NewRequests(requests)
tasFlavorCache.addUsage([]workload.TopologyDomainRequests{
{
Values: levelValues,
Requests: usage,
},
})
}
}

snapshot, err := tasFlavorCache.snapshot(ctx)
if err != nil {
t.Fatalf("failed to build the snapshot: %v", err)
11 changes: 2 additions & 9 deletions pkg/controller/tas/resource_flavor.go
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ import (
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/core"
"sigs.k8s.io/kueue/pkg/queue"
utiltas "sigs.k8s.io/kueue/pkg/util/tas"
)

const (
@@ -152,7 +153,7 @@ func (r *rfReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
}, &topology); err != nil {
return reconcile.Result{}, err
}
levels := r.levels(&topology)
levels := utiltas.Levels(&topology)
tasInfo := r.tasCache.NewTASFlavorCache(levels, flv.Spec.NodeLabels)
r.tasCache.Set(kueue.ResourceFlavorReference(flv.Name), tasInfo)
}
@@ -221,11 +222,3 @@ func nodeBelongsToFlavor(node *corev1.Node, nodeLabels map[string]string, levels
}
return true
}

func (r *rfReconciler) levels(topology *kueuealpha.Topology) []string {
result := make([]string, len(topology.Spec.Levels))
for i, level := range topology.Spec.Levels {
result[i] = level.NodeLabel
}
return result
}
388 changes: 388 additions & 0 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
@@ -50,7 +51,10 @@ import (
"sigs.k8s.io/kueue/pkg/resources"
"sigs.k8s.io/kueue/pkg/scheduler/flavorassigner"
"sigs.k8s.io/kueue/pkg/util/routine"
"sigs.k8s.io/kueue/pkg/util/slices"
utiltas "sigs.k8s.io/kueue/pkg/util/tas"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod"
"sigs.k8s.io/kueue/pkg/workload"
)

@@ -3858,3 +3862,387 @@ func TestResourcesToReserve(t *testing.T) {
})
}
}

func TestScheduleForTAS(t *testing.T) {
const (
tasHostLabel = "kubernetes.io/hostname"
)
defaultSingleNode := []corev1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "x1",
Labels: map[string]string{
"tas-node": "true",
tasHostLabel: "x1",
},
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
},
},
}
defaultSingleLevelTopology := kueuealpha.Topology{
ObjectMeta: metav1.ObjectMeta{
Name: "tas-three-levels",
},
Spec: kueuealpha.TopologySpec{
Levels: []kueuealpha.TopologyLevel{
{
NodeLabel: tasHostLabel,
},
},
},
}
defaultResourceFlavor := kueue.ResourceFlavor{
ObjectMeta: metav1.ObjectMeta{
Name: "tas-default",
},
Spec: kueue.ResourceFlavorSpec{
NodeLabels: map[string]string{
"tas-node": "true",
},
TopologyName: ptr.To("tas-three-levels"),
},
}
defaultClusterQueue := *utiltesting.MakeClusterQueue("tas-main").
ResourceGroup(*utiltesting.MakeFlavorQuotas("tas-default").
Resource(corev1.ResourceCPU, "50").Obj()).
Obj()
queues := []kueue.LocalQueue{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "tas-main",
},
Spec: kueue.LocalQueueSpec{
ClusterQueue: "tas-main",
},
},
}
cases := map[string]struct {
nodes []corev1.Node
pods []corev1.Pod
topologies []kueuealpha.Topology
resourceFlavors []kueue.ResourceFlavor
clusterQueues []kueue.ClusterQueue
workloads []kueue.Workload

// wantNewAssignments is a summary of all the admissions in the cache after this cycle.
wantNewAssignments map[string]kueue.Admission
// wantLeft is the workload keys that are left in the queues after this cycle.
wantLeft map[string][]string
// wantInadmissibleLeft is the workload keys that are left in the inadmissible state after this cycle.
wantInadmissibleLeft map[string][]string
// wantEvents ignored if empty, the Message is ignored (it contains the duration)
wantEvents []utiltesting.EventRecord
}{
"workload required TAS gets scheduled": {
nodes: defaultSingleNode,
topologies: []kueuealpha.Topology{defaultSingleLevelTopology},
resourceFlavors: []kueue.ResourceFlavor{defaultResourceFlavor},
clusterQueues: []kueue.ClusterQueue{defaultClusterQueue},
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("foo", "default").
Queue("tas-main").
PodSets(*utiltesting.MakePodSet("one", 1).
RequiredTopologyRequest(tasHostLabel).
Request(corev1.ResourceCPU, "1").
Obj()).
Obj(),
},
wantNewAssignments: map[string]kueue.Admission{
"default/foo": *utiltesting.MakeAdmission("tas-main", "one").
Assignment(corev1.ResourceCPU, "tas-default", "1000m").
AssignmentPodCount(1).
TopologyAssignment(&kueue.TopologyAssignment{
Levels: utiltas.Levels(&defaultSingleLevelTopology),
Domains: []kueue.TopologyDomainAssignment{
{
Count: 1,
Values: []string{
"x1",
},
},
},
}).Obj(),
},
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "default", Name: "foo"},
Reason: "QuotaReserved",
EventType: corev1.EventTypeNormal,
},
{
Key: types.NamespacedName{Namespace: "default", Name: "foo"},
Reason: "Admitted",
EventType: corev1.EventTypeNormal,
},
},
},
"workload does not get scheduled as it does not fit within the node capacity": {
nodes: defaultSingleNode,
topologies: []kueuealpha.Topology{defaultSingleLevelTopology},
resourceFlavors: []kueue.ResourceFlavor{defaultResourceFlavor},
clusterQueues: []kueue.ClusterQueue{defaultClusterQueue},
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("foo", "default").
Queue("tas-main").
PodSets(*utiltesting.MakePodSet("one", 2).
RequiredTopologyRequest(tasHostLabel).
Request(corev1.ResourceCPU, "1").
Obj()).
Obj(),
},
wantInadmissibleLeft: map[string][]string{
"tas-main": {"default/foo"},
},
},
"workload does not get scheduled as the node capacity is already used by another TAS workload": {
nodes: defaultSingleNode,
topologies: []kueuealpha.Topology{defaultSingleLevelTopology},
resourceFlavors: []kueue.ResourceFlavor{defaultResourceFlavor},
clusterQueues: []kueue.ClusterQueue{defaultClusterQueue},
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("foo", "default").
Queue("tas-main").
PodSets(*utiltesting.MakePodSet("one", 1).
RequiredTopologyRequest(tasHostLabel).
Request(corev1.ResourceCPU, "1").
Obj()).
Obj(),
*utiltesting.MakeWorkload("bar-admitted", "default").
Queue("tas-main").
ReserveQuota(
utiltesting.MakeAdmission("tas-main", "one").
Assignment(corev1.ResourceCPU, "tas-default", "1000m").
AssignmentPodCount(1).
TopologyAssignment(&kueue.TopologyAssignment{
Levels: utiltas.Levels(&defaultSingleLevelTopology),
Domains: []kueue.TopologyDomainAssignment{
{
Count: 1,
Values: []string{
"x1",
},
},
},
}).Obj(),
).
Admitted(true).
PodSets(*utiltesting.MakePodSet("one", 1).
RequiredTopologyRequest(tasHostLabel).
Request(corev1.ResourceCPU, "1").
Obj()).
Obj(),
},
wantInadmissibleLeft: map[string][]string{
"tas-main": {"default/foo"},
},
},
"workload does not get scheduled as the node capacity is already used by a non-TAS pod": {
nodes: defaultSingleNode,
pods: []corev1.Pod{
*testingpod.MakePod("test-pending", "test-ns").NodeName("x1").
StatusPhase(corev1.PodPending).
Request(corev1.ResourceCPU, "600m").
Obj(),
},
topologies: []kueuealpha.Topology{defaultSingleLevelTopology},
resourceFlavors: []kueue.ResourceFlavor{defaultResourceFlavor},
clusterQueues: []kueue.ClusterQueue{defaultClusterQueue},
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("foo", "default").
Queue("tas-main").
PodSets(*utiltesting.MakePodSet("one", 1).
RequiredTopologyRequest(tasHostLabel).
Request(corev1.ResourceCPU, "1").
Obj()).
Obj(),
},
wantInadmissibleLeft: map[string][]string{
"tas-main": {"default/foo"},
},
},
"workload gets scheduled as the usage of TAS pods and workloads is not double-counted": {
nodes: defaultSingleNode,
pods: []corev1.Pod{
*testingpod.MakePod("test-running", "test-ns").NodeName("x1").
StatusPhase(corev1.PodRunning).
Request(corev1.ResourceCPU, "400m").
NodeSelector(tasHostLabel, "x1").
Label(kueuealpha.TASLabel, "true").
Obj(),
},
topologies: []kueuealpha.Topology{defaultSingleLevelTopology},
resourceFlavors: []kueue.ResourceFlavor{defaultResourceFlavor},
clusterQueues: []kueue.ClusterQueue{defaultClusterQueue},
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("foo", "default").
Queue("tas-main").
PodSets(*utiltesting.MakePodSet("one", 1).
RequiredTopologyRequest(tasHostLabel).
Request(corev1.ResourceCPU, "500m").
Obj()).
Obj(),
*utiltesting.MakeWorkload("bar-admitted", "default").
Queue("tas-main").
ReserveQuota(
utiltesting.MakeAdmission("tas-main", "one").
Assignment(corev1.ResourceCPU, "tas-default", "400m").
AssignmentPodCount(1).
TopologyAssignment(&kueue.TopologyAssignment{
Levels: utiltas.Levels(&defaultSingleLevelTopology),
Domains: []kueue.TopologyDomainAssignment{
{
Count: 1,
Values: []string{
"x1",
},
},
},
}).Obj(),
).
Admitted(true).
PodSets(*utiltesting.MakePodSet("one", 1).
RequiredTopologyRequest(tasHostLabel).
Request(corev1.ResourceCPU, "400m").
Obj()).
Obj(),
},
wantNewAssignments: map[string]kueue.Admission{
"default/foo": *utiltesting.MakeAdmission("tas-main", "one").
Assignment(corev1.ResourceCPU, "tas-default", "500m").
AssignmentPodCount(1).
TopologyAssignment(&kueue.TopologyAssignment{
Levels: utiltas.Levels(&defaultSingleLevelTopology),
Domains: []kueue.TopologyDomainAssignment{
{
Count: 1,
Values: []string{
"x1",
},
},
},
}).Obj(),
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
features.SetFeatureGateDuringTest(t, features.TopologyAwareScheduling, true)
ctx, _ := utiltesting.ContextWithLog(t)

clientBuilder := utiltesting.NewClientBuilder().
WithLists(
&kueue.WorkloadList{Items: tc.workloads},
&kueuealpha.TopologyList{Items: tc.topologies},
&corev1.PodList{Items: tc.pods},
&corev1.NodeList{Items: tc.nodes},
&kueue.LocalQueueList{Items: queues}).
WithObjects(
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}},
)
cl := clientBuilder.Build()
recorder := &utiltesting.EventRecorder{}
cqCache := cache.New(cl)
qManager := queue.NewManager(cl, cqCache)
topologyByName := slices.ToMap(tc.topologies, func(i int) (string, kueuealpha.Topology) {
return tc.topologies[i].Name, tc.topologies[i]
})
for _, flavor := range tc.resourceFlavors {
cqCache.AddOrUpdateResourceFlavor(&flavor)
if flavor.Spec.TopologyName != nil {
t := topologyByName[*flavor.Spec.TopologyName]
tasCache := cqCache.TASCache()
levels := utiltas.Levels(&t)
tasFlavorCache := tasCache.NewTASFlavorCache(levels, flavor.Spec.NodeLabels)
tasCache.Set(kueue.ResourceFlavorReference(flavor.Name), tasFlavorCache)
}
}
for _, cq := range tc.clusterQueues {
if err := cqCache.AddClusterQueue(ctx, &cq); err != nil {
t.Fatalf("Inserting clusterQueue %s in cache: %v", cq.Name, err)
}
if err := qManager.AddClusterQueue(ctx, &cq); err != nil {
t.Fatalf("Inserting clusterQueue %s in manager: %v", cq.Name, err)
}
if err := cl.Create(ctx, &cq); err != nil {
t.Fatalf("couldn't create the cluster queue: %v", err)
}
}
for _, q := range queues {
if err := qManager.AddLocalQueue(ctx, &q); err != nil {
t.Fatalf("Inserting queue %s/%s in manager: %v", q.Namespace, q.Name, err)
}
}
initiallyAdmittedWorkloads := sets.New[string]()
for _, w := range tc.workloads {
if workload.IsAdmitted(&w) {
initiallyAdmittedWorkloads.Insert(workload.Key(&w))
} else {
qManager.AddOrUpdateWorkload(&w)
}
}
scheduler := New(qManager, cqCache, cl, recorder)
gotScheduled := make([]string, 0)
var mu sync.Mutex
scheduler.applyAdmission = func(ctx context.Context, w *kueue.Workload) error {
mu.Lock()
gotScheduled = append(gotScheduled, workload.Key(w))
mu.Unlock()
return nil
}
wg := sync.WaitGroup{}
scheduler.setAdmissionRoutineWrapper(routine.NewWrapper(
func() { wg.Add(1) },
func() { wg.Done() },
))

ctx, cancel := context.WithTimeout(ctx, queueingTimeout)
go qManager.CleanUpOnContext(ctx)
defer cancel()

scheduler.schedule(ctx)
wg.Wait()
snapshot, err := cqCache.Snapshot(ctx)
if err != nil {
t.Fatalf("unexpected error while building snapshot: %v", err)
}
gotAssignments := make(map[string]kueue.Admission)
for cqName, c := range snapshot.ClusterQueues {
for name, w := range c.Workloads {
if initiallyAdmittedWorkloads.Has(workload.Key(w.Obj)) {
continue
}
switch {
case !workload.HasQuotaReservation(w.Obj):
t.Fatalf("Workload %s is not admitted by a clusterQueue, but it is found as member of clusterQueue %s in the cache", name, cqName)
case string(w.Obj.Status.Admission.ClusterQueue) != cqName:
t.Fatalf("Workload %s is admitted by clusterQueue %s, but it is found as member of clusterQueue %s in the cache", name, w.Obj.Status.Admission.ClusterQueue, cqName)
default:
gotAssignments[name] = *w.Obj.Status.Admission
}
}
}
if diff := cmp.Diff(tc.wantNewAssignments, gotAssignments, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("Unexpected assigned clusterQueues in cache (-want,+got):\n%s", diff)
}
qDump := qManager.Dump()
if diff := cmp.Diff(tc.wantLeft, qDump, cmpDump...); diff != "" {
t.Errorf("Unexpected elements left in the queue (-want,+got):\n%s", diff)
}
qDumpInadmissible := qManager.DumpInadmissible()
if diff := cmp.Diff(tc.wantInadmissibleLeft, qDumpInadmissible, cmpDump...); diff != "" {
t.Errorf("Unexpected elements left in inadmissible workloads (-want,+got):\n%s", diff)
}
if len(tc.wantEvents) > 0 {
if diff := cmp.Diff(tc.wantEvents, recorder.RecordedEvents, cmpopts.IgnoreFields(utiltesting.EventRecord{}, "Message")); diff != "" {
t.Errorf("unexpected events (-want/+got):\n%s", diff)
}
}
})
}
}
10 changes: 10 additions & 0 deletions pkg/util/tas/tas.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,8 @@ package tas

import (
"strings"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
)

type TopologyDomainID string
@@ -41,3 +43,11 @@ func LevelValues(levelKeys []string, objectLabels map[string]string) []string {
}
return levelValues
}

func Levels(topology *kueuealpha.Topology) []string {
result := make([]string, len(topology.Spec.Levels))
for i, level := range topology.Spec.Levels {
result[i] = level.NodeLabel
}
return result
}
14 changes: 14 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
@@ -398,6 +398,20 @@ func (p *PodSetWrapper) RuntimeClass(name string) *PodSetWrapper {
return p
}

func (p *PodSetWrapper) RequiredTopologyRequest(level string) *PodSetWrapper {
p.TopologyRequest = &kueue.PodSetTopologyRequest{
Required: ptr.To(level),
}
return p
}

func (p *PodSetWrapper) PreferredTopologyRequest(level string) *PodSetWrapper {
p.TopologyRequest = &kueue.PodSetTopologyRequest{
Preferred: ptr.To(level),
}
return p
}

func (p *PodSetWrapper) Obj() *kueue.PodSet {
return &p.PodSet
}

0 comments on commit 39bb53e

Please sign in to comment.