diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index 385e254a4d..14a6c92459 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -173,15 +173,15 @@ func (c *clusterQueue) updateClusterQueue(cycleChecker hierarchy.CycleChecker, i c.AdmissionChecks = utilac.NewAdmissionChecks(in) - c.UpdateWithFlavors(resourceFlavors) - c.updateWithAdmissionChecks(admissionChecks) - if in.Spec.Preemption != nil { c.Preemption = *in.Spec.Preemption } else { c.Preemption = defaultPreemption } + c.UpdateWithFlavors(resourceFlavors) + c.updateWithAdmissionChecks(admissionChecks) + if in.Spec.FlavorFungibility != nil { c.FlavorFungibility = *in.Spec.FlavorFungibility if c.FlavorFungibility.WhenCanBorrow == "" { diff --git a/pkg/cache/tas_cache_test.go b/pkg/cache/tas_cache_test.go index 9a4dfb7b8f..2b5a062d9f 100644 --- a/pkg/cache/tas_cache_test.go +++ b/pkg/cache/tas_cache_test.go @@ -27,14 +27,10 @@ import ( "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" - kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/resources" - "sigs.k8s.io/kueue/pkg/util/limitrange" - utiltas "sigs.k8s.io/kueue/pkg/util/tas" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" - "sigs.k8s.io/kueue/pkg/workload" ) func TestFindTopologyAssignment(t *testing.T) { @@ -805,51 +801,6 @@ func TestFindTopologyAssignment(t *testing.T) { }, }, }, - "don't double-count TAS pods when computing the capacity": { - nodes: []corev1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "x1", - Labels: map[string]string{ - tasHostLabel: "x1", - }, - }, - Status: corev1.NodeStatus{ - Allocatable: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("1Gi"), - }, - }, - }, - }, - pods: []corev1.Pod{ - *testingpod.MakePod("test-tas", "test-ns").NodeName("x1"). - Request(corev1.ResourceCPU, "400m"). - NodeSelector(tasHostLabel, "x1"). - Label(kueuealpha.TASLabel, "true"). - StatusPhase(corev1.PodRunning). - Obj(), - }, - request: kueue.PodSetTopologyRequest{ - Required: ptr.To(tasHostLabel), - }, - levels: defaultOneLevel, - requests: resources.Requests{ - corev1.ResourceCPU: 600, - }, - count: 1, - wantAssignment: &kueue.TopologyAssignment{ - Levels: defaultOneLevel, - Domains: []kueue.TopologyDomainAssignment{ - { - Count: 1, - Values: []string{ - "x1", - }, - }, - }, - }, - }, "include usage from pending scheduled non-TAS pods, blocked assignment": { // there is not enough free capacity on the only node x1 nodes: []corev1.Node{ @@ -995,23 +946,6 @@ func TestFindTopologyAssignment(t *testing.T) { tasCache := NewTASCache(client) tasFlavorCache := tasCache.NewTASFlavorCache(tc.levels, tc.nodeLabels) - // account for usage from TAS pods for the need of the unit tests. - // note that in practice the usage is accounted based on TAS - // Workloads. - for _, pod := range tc.pods { - if _, ok := pod.Labels[kueuealpha.TASLabel]; ok { - levelValues := utiltas.LevelValues(tc.levels, pod.Spec.NodeSelector) - requests := limitrange.TotalRequests(&pod.Spec) - usage := resources.NewRequests(requests) - tasFlavorCache.addUsage([]workload.TopologyDomainRequests{ - { - Values: levelValues, - Requests: usage, - }, - }) - } - } - snapshot, err := tasFlavorCache.snapshot(ctx) if err != nil { t.Fatalf("failed to build the snapshot: %v", err) diff --git a/pkg/controller/tas/resource_flavor.go b/pkg/controller/tas/resource_flavor.go index 0e203fdd88..eebd9851e4 100644 --- a/pkg/controller/tas/resource_flavor.go +++ b/pkg/controller/tas/resource_flavor.go @@ -39,6 +39,7 @@ import ( "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/controller/core" "sigs.k8s.io/kueue/pkg/queue" + utiltas "sigs.k8s.io/kueue/pkg/util/tas" ) const ( @@ -152,7 +153,7 @@ func (r *rfReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re }, &topology); err != nil { return reconcile.Result{}, err } - levels := r.levels(&topology) + levels := utiltas.Levels(&topology) tasInfo := r.tasCache.NewTASFlavorCache(levels, flv.Spec.NodeLabels) r.tasCache.Set(kueue.ResourceFlavorReference(flv.Name), tasInfo) } @@ -221,11 +222,3 @@ func nodeBelongsToFlavor(node *corev1.Node, nodeLabels map[string]string, levels } return true } - -func (r *rfReconciler) levels(topology *kueuealpha.Topology) []string { - result := make([]string, len(topology.Spec.Levels)) - for i, level := range topology.Spec.Levels { - result[i] = level.NodeLabel - } - return result -} diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index fc36d0cdd2..4a5cbd71f8 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -41,6 +41,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/interceptor" config "sigs.k8s.io/kueue/apis/config/v1beta1" + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" @@ -50,7 +51,10 @@ import ( "sigs.k8s.io/kueue/pkg/resources" "sigs.k8s.io/kueue/pkg/scheduler/flavorassigner" "sigs.k8s.io/kueue/pkg/util/routine" + "sigs.k8s.io/kueue/pkg/util/slices" + utiltas "sigs.k8s.io/kueue/pkg/util/tas" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" "sigs.k8s.io/kueue/pkg/workload" ) @@ -3858,3 +3862,424 @@ func TestResourcesToReserve(t *testing.T) { }) } } + +func TestScheduleForTAS(t *testing.T) { + const ( + tasHostLabel = "kubernetes.io/hostname" + ) + defaultSingleNode := []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "x1", + Labels: map[string]string{ + "tas-node": "true", + tasHostLabel: "x1", + }, + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + }, + } + defaultSingleLevelTopology := kueuealpha.Topology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tas-single-level", + }, + Spec: kueuealpha.TopologySpec{ + Levels: []kueuealpha.TopologyLevel{ + { + NodeLabel: tasHostLabel, + }, + }, + }, + } + defaultTASFlavor := kueue.ResourceFlavor{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tas-default", + }, + Spec: kueue.ResourceFlavorSpec{ + NodeLabels: map[string]string{ + "tas-node": "true", + }, + TopologyName: ptr.To("tas-single-level"), + }, + } + defaultClusterQueue := *utiltesting.MakeClusterQueue("tas-main"). + ResourceGroup(*utiltesting.MakeFlavorQuotas("tas-default"). + Resource(corev1.ResourceCPU, "50").Obj()). + Obj() + queues := []kueue.LocalQueue{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "tas-main", + }, + Spec: kueue.LocalQueueSpec{ + ClusterQueue: "tas-main", + }, + }, + } + eventIgnoreMessage := cmpopts.IgnoreFields(utiltesting.EventRecord{}, "Message") + cases := map[string]struct { + nodes []corev1.Node + pods []corev1.Pod + topologies []kueuealpha.Topology + resourceFlavors []kueue.ResourceFlavor + clusterQueues []kueue.ClusterQueue + workloads []kueue.Workload + + // wantNewAssignments is a summary of all new admissions in the cache after this cycle. + wantNewAssignments map[string]kueue.Admission + // wantLeft is the workload keys that are left in the queues after this cycle. + wantLeft map[string][]string + // wantInadmissibleLeft is the workload keys that are left in the inadmissible state after this cycle. + wantInadmissibleLeft map[string][]string + // wantEvents asserts on the events, the comparison options are passed by eventCmpOpts + wantEvents []utiltesting.EventRecord + // eventCmpOpts are the comparison options for the events + eventCmpOpts []cmp.Option + }{ + "workload required TAS gets scheduled": { + nodes: defaultSingleNode, + topologies: []kueuealpha.Topology{defaultSingleLevelTopology}, + resourceFlavors: []kueue.ResourceFlavor{defaultTASFlavor}, + clusterQueues: []kueue.ClusterQueue{defaultClusterQueue}, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("foo", "default"). + Queue("tas-main"). + PodSets(*utiltesting.MakePodSet("one", 1). + RequiredTopologyRequest(tasHostLabel). + Request(corev1.ResourceCPU, "1"). + Obj()). + Obj(), + }, + wantNewAssignments: map[string]kueue.Admission{ + "default/foo": *utiltesting.MakeAdmission("tas-main", "one"). + Assignment(corev1.ResourceCPU, "tas-default", "1000m"). + AssignmentPodCount(1). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: utiltas.Levels(&defaultSingleLevelTopology), + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "x1", + }, + }, + }, + }).Obj(), + }, + eventCmpOpts: []cmp.Option{eventIgnoreMessage}, + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{Namespace: "default", Name: "foo"}, + Reason: "QuotaReserved", + EventType: corev1.EventTypeNormal, + }, + { + Key: types.NamespacedName{Namespace: "default", Name: "foo"}, + Reason: "Admitted", + EventType: corev1.EventTypeNormal, + }, + }, + }, + "workload does not get scheduled as it does not fit within the node capacity": { + nodes: defaultSingleNode, + topologies: []kueuealpha.Topology{defaultSingleLevelTopology}, + resourceFlavors: []kueue.ResourceFlavor{defaultTASFlavor}, + clusterQueues: []kueue.ClusterQueue{defaultClusterQueue}, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("foo", "default"). + Queue("tas-main"). + PodSets(*utiltesting.MakePodSet("one", 2). + RequiredTopologyRequest(tasHostLabel). + Request(corev1.ResourceCPU, "1"). + Obj()). + Obj(), + }, + wantInadmissibleLeft: map[string][]string{ + "tas-main": {"default/foo"}, + }, + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{Namespace: "default", Name: "foo"}, + EventType: "Warning", + Reason: "Pending", + Message: "couldn't assign flavors to pod set one: Workload cannot fit within the TAS ResourceFlavor", + }, + }, + }, + "workload does not get scheduled as the node capacity is already used by another TAS workload": { + nodes: defaultSingleNode, + topologies: []kueuealpha.Topology{defaultSingleLevelTopology}, + resourceFlavors: []kueue.ResourceFlavor{defaultTASFlavor}, + clusterQueues: []kueue.ClusterQueue{defaultClusterQueue}, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("foo", "default"). + Queue("tas-main"). + PodSets(*utiltesting.MakePodSet("one", 1). + RequiredTopologyRequest(tasHostLabel). + Request(corev1.ResourceCPU, "1"). + Obj()). + Obj(), + *utiltesting.MakeWorkload("bar-admitted", "default"). + Queue("tas-main"). + ReserveQuota( + utiltesting.MakeAdmission("tas-main", "one"). + Assignment(corev1.ResourceCPU, "tas-default", "1000m"). + AssignmentPodCount(1). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: utiltas.Levels(&defaultSingleLevelTopology), + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "x1", + }, + }, + }, + }).Obj(), + ). + Admitted(true). + PodSets(*utiltesting.MakePodSet("one", 1). + RequiredTopologyRequest(tasHostLabel). + Request(corev1.ResourceCPU, "1"). + Obj()). + Obj(), + }, + wantInadmissibleLeft: map[string][]string{ + "tas-main": {"default/foo"}, + }, + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{Namespace: "default", Name: "foo"}, + EventType: "Warning", + Reason: "Pending", + Message: "couldn't assign flavors to pod set one: Workload cannot fit within the TAS ResourceFlavor", + }, + }, + }, + "workload does not get scheduled as the node capacity is already used by a non-TAS pod": { + nodes: defaultSingleNode, + pods: []corev1.Pod{ + *testingpod.MakePod("test-pending", "test-ns").NodeName("x1"). + StatusPhase(corev1.PodPending). + Request(corev1.ResourceCPU, "600m"). + Obj(), + }, + topologies: []kueuealpha.Topology{defaultSingleLevelTopology}, + resourceFlavors: []kueue.ResourceFlavor{defaultTASFlavor}, + clusterQueues: []kueue.ClusterQueue{defaultClusterQueue}, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("foo", "default"). + Queue("tas-main"). + PodSets(*utiltesting.MakePodSet("one", 1). + RequiredTopologyRequest(tasHostLabel). + Request(corev1.ResourceCPU, "1"). + Obj()). + Obj(), + }, + wantInadmissibleLeft: map[string][]string{ + "tas-main": {"default/foo"}, + }, + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{Namespace: "default", Name: "foo"}, + EventType: "Warning", + Reason: "Pending", + Message: "couldn't assign flavors to pod set one: Workload cannot fit within the TAS ResourceFlavor", + }, + }, + }, + "workload gets scheduled as the usage of TAS pods and workloads is not double-counted": { + nodes: defaultSingleNode, + pods: []corev1.Pod{ + *testingpod.MakePod("test-running", "test-ns").NodeName("x1"). + StatusPhase(corev1.PodRunning). + Request(corev1.ResourceCPU, "400m"). + NodeSelector(tasHostLabel, "x1"). + Label(kueuealpha.TASLabel, "true"). + Obj(), + }, + topologies: []kueuealpha.Topology{defaultSingleLevelTopology}, + resourceFlavors: []kueue.ResourceFlavor{defaultTASFlavor}, + clusterQueues: []kueue.ClusterQueue{defaultClusterQueue}, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("foo", "default"). + Queue("tas-main"). + PodSets(*utiltesting.MakePodSet("one", 1). + RequiredTopologyRequest(tasHostLabel). + Request(corev1.ResourceCPU, "500m"). + Obj()). + Obj(), + *utiltesting.MakeWorkload("bar-admitted", "default"). + Queue("tas-main"). + ReserveQuota( + utiltesting.MakeAdmission("tas-main", "one"). + Assignment(corev1.ResourceCPU, "tas-default", "400m"). + AssignmentPodCount(1). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: utiltas.Levels(&defaultSingleLevelTopology), + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "x1", + }, + }, + }, + }).Obj(), + ). + Admitted(true). + PodSets(*utiltesting.MakePodSet("one", 1). + RequiredTopologyRequest(tasHostLabel). + Request(corev1.ResourceCPU, "400m"). + Obj()). + Obj(), + }, + wantNewAssignments: map[string]kueue.Admission{ + "default/foo": *utiltesting.MakeAdmission("tas-main", "one"). + Assignment(corev1.ResourceCPU, "tas-default", "500m"). + AssignmentPodCount(1). + TopologyAssignment(&kueue.TopologyAssignment{ + Levels: utiltas.Levels(&defaultSingleLevelTopology), + Domains: []kueue.TopologyDomainAssignment{ + { + Count: 1, + Values: []string{ + "x1", + }, + }, + }, + }).Obj(), + }, + eventCmpOpts: []cmp.Option{eventIgnoreMessage}, + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{Namespace: "default", Name: "foo"}, + Reason: "QuotaReserved", + EventType: corev1.EventTypeNormal, + }, + { + Key: types.NamespacedName{Namespace: "default", Name: "foo"}, + Reason: "Admitted", + EventType: corev1.EventTypeNormal, + }, + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + features.SetFeatureGateDuringTest(t, features.TopologyAwareScheduling, true) + ctx, _ := utiltesting.ContextWithLog(t) + + clientBuilder := utiltesting.NewClientBuilder(). + WithLists( + &kueue.WorkloadList{Items: tc.workloads}, + &kueuealpha.TopologyList{Items: tc.topologies}, + &corev1.PodList{Items: tc.pods}, + &corev1.NodeList{Items: tc.nodes}, + &kueue.LocalQueueList{Items: queues}). + WithObjects( + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}}, + ) + cl := clientBuilder.Build() + recorder := &utiltesting.EventRecorder{} + cqCache := cache.New(cl) + qManager := queue.NewManager(cl, cqCache) + topologyByName := slices.ToMap(tc.topologies, func(i int) (string, kueuealpha.Topology) { + return tc.topologies[i].Name, tc.topologies[i] + }) + for _, flavor := range tc.resourceFlavors { + cqCache.AddOrUpdateResourceFlavor(&flavor) + if flavor.Spec.TopologyName != nil { + t := topologyByName[*flavor.Spec.TopologyName] + tasCache := cqCache.TASCache() + levels := utiltas.Levels(&t) + tasFlavorCache := tasCache.NewTASFlavorCache(levels, flavor.Spec.NodeLabels) + tasCache.Set(kueue.ResourceFlavorReference(flavor.Name), tasFlavorCache) + } + } + for _, cq := range tc.clusterQueues { + if err := cqCache.AddClusterQueue(ctx, &cq); err != nil { + t.Fatalf("Inserting clusterQueue %s in cache: %v", cq.Name, err) + } + if err := qManager.AddClusterQueue(ctx, &cq); err != nil { + t.Fatalf("Inserting clusterQueue %s in manager: %v", cq.Name, err) + } + if err := cl.Create(ctx, &cq); err != nil { + t.Fatalf("couldn't create the cluster queue: %v", err) + } + } + for _, q := range queues { + if err := qManager.AddLocalQueue(ctx, &q); err != nil { + t.Fatalf("Inserting queue %s/%s in manager: %v", q.Namespace, q.Name, err) + } + } + initiallyAdmittedWorkloads := sets.New[string]() + for _, w := range tc.workloads { + if workload.IsAdmitted(&w) { + initiallyAdmittedWorkloads.Insert(workload.Key(&w)) + } + } + scheduler := New(qManager, cqCache, cl, recorder) + gotScheduled := make([]string, 0) + var mu sync.Mutex + scheduler.applyAdmission = func(ctx context.Context, w *kueue.Workload) error { + mu.Lock() + gotScheduled = append(gotScheduled, workload.Key(w)) + mu.Unlock() + return nil + } + wg := sync.WaitGroup{} + scheduler.setAdmissionRoutineWrapper(routine.NewWrapper( + func() { wg.Add(1) }, + func() { wg.Done() }, + )) + + ctx, cancel := context.WithTimeout(ctx, queueingTimeout) + go qManager.CleanUpOnContext(ctx) + defer cancel() + + scheduler.schedule(ctx) + wg.Wait() + snapshot, err := cqCache.Snapshot(ctx) + if err != nil { + t.Fatalf("unexpected error while building snapshot: %v", err) + } + gotAssignments := make(map[string]kueue.Admission) + for cqName, c := range snapshot.ClusterQueues { + for name, w := range c.Workloads { + if initiallyAdmittedWorkloads.Has(workload.Key(w.Obj)) { + continue + } + switch { + case !workload.HasQuotaReservation(w.Obj): + t.Fatalf("Workload %s is not admitted by a clusterQueue, but it is found as member of clusterQueue %s in the cache", name, cqName) + case string(w.Obj.Status.Admission.ClusterQueue) != cqName: + t.Fatalf("Workload %s is admitted by clusterQueue %s, but it is found as member of clusterQueue %s in the cache", name, w.Obj.Status.Admission.ClusterQueue, cqName) + default: + gotAssignments[name] = *w.Obj.Status.Admission + } + } + } + if diff := cmp.Diff(tc.wantNewAssignments, gotAssignments, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("Unexpected assigned clusterQueues in cache (-want,+got):\n%s", diff) + } + qDump := qManager.Dump() + if diff := cmp.Diff(tc.wantLeft, qDump, cmpDump...); diff != "" { + t.Errorf("Unexpected elements left in the queue (-want,+got):\n%s", diff) + } + qDumpInadmissible := qManager.DumpInadmissible() + if diff := cmp.Diff(tc.wantInadmissibleLeft, qDumpInadmissible, cmpDump...); diff != "" { + t.Errorf("Unexpected elements left in inadmissible workloads (-want,+got):\n%s", diff) + } + if diff := cmp.Diff(tc.wantEvents, recorder.RecordedEvents, tc.eventCmpOpts...); diff != "" { + t.Errorf("unexpected events (-want/+got):\n%s", diff) + } + }) + } +} diff --git a/pkg/util/tas/tas.go b/pkg/util/tas/tas.go index addb38122c..8f0bc994c9 100644 --- a/pkg/util/tas/tas.go +++ b/pkg/util/tas/tas.go @@ -18,6 +18,8 @@ package tas import ( "strings" + + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" ) type TopologyDomainID string @@ -41,3 +43,11 @@ func LevelValues(levelKeys []string, objectLabels map[string]string) []string { } return levelValues } + +func Levels(topology *kueuealpha.Topology) []string { + result := make([]string, len(topology.Spec.Levels)) + for i, level := range topology.Spec.Levels { + result[i] = level.NodeLabel + } + return result +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 94acd0813e..05d07ac58e 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -398,6 +398,20 @@ func (p *PodSetWrapper) RuntimeClass(name string) *PodSetWrapper { return p } +func (p *PodSetWrapper) RequiredTopologyRequest(level string) *PodSetWrapper { + p.TopologyRequest = &kueue.PodSetTopologyRequest{ + Required: ptr.To(level), + } + return p +} + +func (p *PodSetWrapper) PreferredTopologyRequest(level string) *PodSetWrapper { + p.TopologyRequest = &kueue.PodSetTopologyRequest{ + Preferred: ptr.To(level), + } + return p +} + func (p *PodSetWrapper) Obj() *kueue.PodSet { return &p.PodSet }