Skip to content

Commit

Permalink
TAS: validation for CQs using TAS (#3320)
Browse files Browse the repository at this point in the history
* TAS: soft validation for ClusterQueues

* Review remarks
  • Loading branch information
mimowo authored Oct 29, 2024
1 parent e59b315 commit 038a563
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 31 deletions.
1 change: 1 addition & 0 deletions apis/kueue/v1beta1/clusterqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor = "FlavorIndependentAdmissionCheckAppliedPerFlavor"
ClusterQueueActiveReasonMultipleMultiKueueAdmissionChecks = "MultipleMultiKueueAdmissionChecks"
ClusterQueueActiveReasonMultiKueueAdmissionCheckAppliedPerFlavor = "MultiKueueAdmissionCheckAppliedPerFlavor"
ClusterQueueActiveReasonNotSupportedWithTopologyAwareScheduling = "NotSupportedWithTopologyAwareScheduling"
ClusterQueueActiveReasonUnknown = "Unknown"
ClusterQueueActiveReasonReady = "Ready"
)
Expand Down
6 changes: 6 additions & 0 deletions apis/kueue/v1beta1/provisioningrequestconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// ProvisioningRequestControllerName is the name used by the Provisioning
// Request admission check controller.
ProvisioningRequestControllerName = "kueue.x-k8s.io/provisioning-request"
)

// ProvisioningRequestConfigSpec defines the desired state of ProvisioningRequestConfig
type ProvisioningRequestConfigSpec struct {
// ProvisioningClassName describes the different modes of provisioning the resources.
Expand Down
52 changes: 49 additions & 3 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ type clusterQueue struct {
multipleSingleInstanceControllersChecks map[string][]string // key = controllerName
flavorIndependentAdmissionCheckAppliedPerFlavor []string
multiKueueAdmissionChecks []string
provisioningAdmissionChecks []string
perFlavorMultiKueueAdmissionChecks []string
tasFlavors []kueue.ResourceFlavorReference
admittedWorkloadsCount int
isStopped bool
workloadInfoOptions []workload.InfoOption
Expand Down Expand Up @@ -236,6 +238,7 @@ func (c *clusterQueue) updateQueueStatus() {
len(c.inactiveAdmissionChecks) > 0 ||
len(c.multipleSingleInstanceControllersChecks) > 0 ||
len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 ||
c.isTASViolated() ||
// one multikueue admission check is allowed
len(c.multiKueueAdmissionChecks) > 1 ||
len(c.perFlavorMultiKueueAdmissionChecks) > 0 {
Expand Down Expand Up @@ -297,6 +300,25 @@ func (c *clusterQueue) inactiveReason() (string, string) {
messages = append(messages, fmt.Sprintf("AdmissionCheck(s): %v cannot be set at flavor level", c.flavorIndependentAdmissionCheckAppliedPerFlavor))
}

if features.Enabled(features.TopologyAwareScheduling) && len(c.tasFlavors) > 0 {
if c.HasParent() {
reasons = append(reasons, kueue.ClusterQueueActiveReasonNotSupportedWithTopologyAwareScheduling)
messages = append(messages, "TAS is not supported for cohorts")
}
if c.Preemption.WithinClusterQueue != kueue.PreemptionPolicyNever {
reasons = append(reasons, kueue.ClusterQueueActiveReasonNotSupportedWithTopologyAwareScheduling)
messages = append(messages, "TAS is not supported for preemption within cluster queue")
}
if len(c.multiKueueAdmissionChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonNotSupportedWithTopologyAwareScheduling)
messages = append(messages, "TAS is not supported with MultiKueue admission check")
}
if len(c.provisioningAdmissionChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonNotSupportedWithTopologyAwareScheduling)
messages = append(messages, "TAS is not supported with ProvisioningRequest admission check")
}
}

if len(reasons) == 0 {
return kueue.ClusterQueueActiveReasonUnknown, "Can't admit new workloads."
}
Expand All @@ -306,6 +328,16 @@ func (c *clusterQueue) inactiveReason() (string, string) {
return kueue.ClusterQueueActiveReasonReady, "Can admit new workloads"
}

func (c *clusterQueue) isTASViolated() bool {
if !features.Enabled(features.TopologyAwareScheduling) || len(c.tasFlavors) == 0 {
return false
}
return c.HasParent() ||
c.Preemption.WithinClusterQueue != kueue.PreemptionPolicyNever ||
len(c.multiKueueAdmissionChecks) > 0 ||
len(c.provisioningAdmissionChecks) > 0
}

// UpdateWithFlavors updates a ClusterQueue based on the passed ResourceFlavors set.
// Exported only for testing.
func (c *clusterQueue) UpdateWithFlavors(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) {
Expand All @@ -315,6 +347,7 @@ func (c *clusterQueue) UpdateWithFlavors(flavors map[kueue.ResourceFlavorReferen

func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) {
c.missingFlavors = nil
c.tasFlavors = nil
for i := range c.ResourceGroups {
rg := &c.ResourceGroups[i]
if len(rg.Flavors) == 0 {
Expand All @@ -327,6 +360,9 @@ func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference
for k := range flv.Spec.NodeLabels {
keys.Insert(k)
}
if flv.Spec.TopologyName != nil {
c.tasFlavors = append(c.tasFlavors, fName)
}
} else {
c.missingFlavors = append(c.missingFlavors, fName)
}
Expand All @@ -343,6 +379,7 @@ func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionChec
checksPerController := make(map[string][]string, len(c.AdmissionChecks))
singleInstanceControllers := sets.New[string]()
multiKueueAdmissionChecks := sets.New[string]()
provisioningAdmissionChecks := sets.New[string]()
var missing []string
var inactive []string
var flavorIndependentCheckOnFlavors []string
Expand All @@ -362,6 +399,9 @@ func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionChec
flavorIndependentCheckOnFlavors = append(flavorIndependentCheckOnFlavors, acName)
}

if ac.Controller == kueue.ProvisioningRequestControllerName {
provisioningAdmissionChecks.Insert(acName)
}
if ac.Controller == kueue.MultiKueueControllerName {
// MultiKueue Admission Checks has extra constraints:
// - cannot use multiple MultiKueue AdmissionChecks on the same ClusterQueue
Expand All @@ -379,7 +419,8 @@ func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionChec
slices.Sort(inactive)
slices.Sort(flavorIndependentCheckOnFlavors)
slices.Sort(perFlavorMultiKueueChecks)
multiKueueAdmissionChecksList := sets.List(multiKueueAdmissionChecks)
multiKueueChecks := sets.List(multiKueueAdmissionChecks)
provisioningChecks := sets.List(provisioningAdmissionChecks)

update := false
if !slices.Equal(c.missingAdmissionChecks, missing) {
Expand Down Expand Up @@ -414,8 +455,13 @@ func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionChec
}
}

if !slices.Equal(c.multiKueueAdmissionChecks, multiKueueAdmissionChecksList) {
c.multiKueueAdmissionChecks = multiKueueAdmissionChecksList
if !slices.Equal(c.multiKueueAdmissionChecks, multiKueueChecks) {
c.multiKueueAdmissionChecks = multiKueueChecks
update = true
}

if !slices.Equal(c.provisioningAdmissionChecks, provisioningChecks) {
c.provisioningAdmissionChecks = provisioningChecks
update = true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var _ reconcile.Reconciler = (*acReconciler)(nil)

func (a *acReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
ac := &kueue.AdmissionCheck{}
if err := a.client.Get(ctx, req.NamespacedName, ac); err != nil || ac.Spec.ControllerName != ControllerName {
if err := a.client.Get(ctx, req.NamespacedName, ac); err != nil || ac.Spec.ControllerName != kueue.ProvisioningRequestControllerName {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestReconcileAdmissionCheck(t *testing.T) {
},
"no parameters specified": {
check: utiltesting.MakeAdmissionCheck("check1").
ControllerName(ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Generation(1).
Obj(),
wantCondition: &metav1.Condition{
Expand All @@ -56,7 +56,7 @@ func TestReconcileAdmissionCheck(t *testing.T) {
"bad ref group": {
check: utiltesting.MakeAdmissionCheck("check1").
Parameters("bad.group", ConfigKind, "config1").
ControllerName(ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Generation(1).
Obj(),
wantCondition: &metav1.Condition{
Expand All @@ -70,7 +70,7 @@ func TestReconcileAdmissionCheck(t *testing.T) {
"bad ref kind": {
check: utiltesting.MakeAdmissionCheck("check1").
Parameters(kueue.GroupVersion.Group, "BadKind", "config1").
ControllerName(ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Generation(1).
Obj(),
wantCondition: &metav1.Condition{
Expand All @@ -84,7 +84,7 @@ func TestReconcileAdmissionCheck(t *testing.T) {
"config missing": {
check: utiltesting.MakeAdmissionCheck("check1").
Parameters(kueue.GroupVersion.Group, ConfigKind, "config1").
ControllerName(ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Generation(1).
Obj(),
wantCondition: &metav1.Condition{
Expand All @@ -98,7 +98,7 @@ func TestReconcileAdmissionCheck(t *testing.T) {
"config found": {
check: utiltesting.MakeAdmissionCheck("check1").
Parameters(kueue.GroupVersion.Group, ConfigKind, "config1").
ControllerName(ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Generation(1).
Obj(),
configs: []kueue.ProvisioningRequestConfig{
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/admissionchecks/provisioning/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package provisioning

const (
ConfigKind = "ProvisioningRequestConfig"
ControllerName = "kueue.x-k8s.io/provisioning-request"
DeprecatedConsumesAnnotationKey = "cluster-autoscaler.kubernetes.io/consume-provisioning-request"
DeprecatedClassNameAnnotationKey = "cluster-autoscaler.kubernetes.io/provisioning-class-name"
ConsumesAnnotationKey = "autoscaling.x-k8s.io/consume-provisioning-request"
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/admissionchecks/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
}

// get the lists of relevant checks
relevantChecks, err := admissioncheck.FilterForController(ctx, c.client, wl.Status.AdmissionChecks, ControllerName)
relevantChecks, err := admissioncheck.FilterForController(ctx, c.client, wl.Status.AdmissionChecks, kueue.ProvisioningRequestControllerName)
if err != nil {
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -586,7 +586,7 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch
workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, checkState)
}
if updated {
if err := c.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership); err != nil {
if err := c.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(kueue.ProvisioningRequestControllerName), client.ForceOwnership); err != nil {
return err
}
for i := range recorderMessages {
Expand Down Expand Up @@ -625,7 +625,7 @@ func (a *acHandler) Create(ctx context.Context, event event.CreateEvent, q workq
return
}

if ac.Spec.ControllerName == ControllerName {
if ac.Spec.ControllerName == kueue.ProvisioningRequestControllerName {
err := a.reconcileWorkloadsUsing(ctx, ac.Name, q)
if err != nil {
ctrl.LoggerFrom(ctx).V(5).Error(err, "Failure on create event", "admissionCheck", klog.KObj(ac))
Expand All @@ -640,7 +640,7 @@ func (a *acHandler) Update(ctx context.Context, event event.UpdateEvent, q workq
return
}

if oldAc.Spec.ControllerName == ControllerName || newAc.Spec.ControllerName == ControllerName {
if oldAc.Spec.ControllerName == kueue.ProvisioningRequestControllerName || newAc.Spec.ControllerName == kueue.ProvisioningRequestControllerName {
err := a.reconcileWorkloadsUsing(ctx, oldAc.Name, q)
if err != nil {
ctrl.LoggerFrom(ctx).V(5).Error(err, "Failure on update event", "admissionCheck", klog.KObj(oldAc))
Expand All @@ -654,7 +654,7 @@ func (a *acHandler) Delete(ctx context.Context, event event.DeleteEvent, q workq
return
}

if ac.Spec.ControllerName == ControllerName {
if ac.Spec.ControllerName == kueue.ProvisioningRequestControllerName {
err := a.reconcileWorkloadsUsing(ctx, ac.Name, q)
if err != nil {
ctrl.LoggerFrom(ctx).V(5).Error(err, "Failure on delete event", "admissionCheck", klog.KObj(ac))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestReconcile(t *testing.T) {
}

baseCheck := utiltesting.MakeAdmissionCheck("check1").
ControllerName(ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Parameters(kueue.GroupVersion.Group, ConfigKind, "config1").
Obj()

Expand Down Expand Up @@ -1370,7 +1370,7 @@ func TestActiveOrLastPRForChecks(t *testing.T) {
pr2Created.Name = "wl-check-2"

baseCheck := utiltesting.MakeAdmissionCheck("check").
ControllerName(ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Parameters(kueue.GroupVersion.Group, ConfigKind, "config1").
Obj()

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/admissionchecks/provisioning/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func SetupIndexer(ctx context.Context, indexer client.FieldIndexer) error {
return fmt.Errorf("setting index on workloads checks: %w", err)
}

if err := indexer.IndexField(ctx, &kueue.AdmissionCheck{}, AdmissionCheckUsingConfigKey, admissioncheck.IndexerByConfigFunction(ControllerName, configGVK)); err != nil {
if err := indexer.IndexField(ctx, &kueue.AdmissionCheck{}, AdmissionCheckUsingConfigKey, admissioncheck.IndexerByConfigFunction(kueue.ProvisioningRequestControllerName, configGVK)); err != nil {
return fmt.Errorf("setting index on admission checks config: %w", err)
}
return nil
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/admissionchecks/provisioning/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func TestIndexAdmissionChecks(t *testing.T) {
"bad ref group": {
checks: []*kueue.AdmissionCheck{
utiltesting.MakeAdmissionCheck("check1").
ControllerName(ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Parameters("core", ConfigKind, "config1").
Obj(),
},
Expand All @@ -297,7 +297,7 @@ func TestIndexAdmissionChecks(t *testing.T) {
"bad ref kind": {
checks: []*kueue.AdmissionCheck{
utiltesting.MakeAdmissionCheck("check1").
ControllerName(ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Parameters(kueue.GroupVersion.Group, "kind", "config1").
Obj(),
},
Expand All @@ -306,7 +306,7 @@ func TestIndexAdmissionChecks(t *testing.T) {
"empty name": {
checks: []*kueue.AdmissionCheck{
utiltesting.MakeAdmissionCheck("check1").
ControllerName(ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Parameters(kueue.GroupVersion.Group, ConfigKind, "").
Obj(),
},
Expand All @@ -315,7 +315,7 @@ func TestIndexAdmissionChecks(t *testing.T) {
"match": {
checks: []*kueue.AdmissionCheck{
utiltesting.MakeAdmissionCheck("check1").
ControllerName(ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Parameters(kueue.GroupVersion.Group, ConfigKind, "config1").
Obj(),
},
Expand All @@ -325,15 +325,15 @@ func TestIndexAdmissionChecks(t *testing.T) {
"multiple checks, partial match": {
checks: []*kueue.AdmissionCheck{
utiltesting.MakeAdmissionCheck("check1").
ControllerName(ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Parameters(kueue.GroupVersion.Group, ConfigKind, "config1").
Obj(),
utiltesting.MakeAdmissionCheck("check2").
ControllerName(ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Parameters(kueue.GroupVersion.Group, ConfigKind, "config1").
Obj(),
utiltesting.MakeAdmissionCheck("check3").
ControllerName(ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Parameters(kueue.GroupVersion.Group, ConfigKind, "config2").
Obj(),
},
Expand Down
5 changes: 2 additions & 3 deletions test/e2e/singlecluster/tas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,10 @@ var _ = ginkgo.Describe("TopologyAwareSchedling", func() {
Resource(corev1.ResourceMemory, "1Gi").
Obj(),
).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed())
util.ExpectClusterQueuesToBeActive(ctx, k8sClient, clusterQueue)

localQueue = testing.MakeLocalQueue("main", ns.Name).ClusterQueue("cluster-queue").Obj()
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ var _ = ginkgo.Describe("Provisioning", ginkgo.Ordered, ginkgo.ContinueOnFailure
gomega.Expect(k8sClient.Create(ctx, prc2)).To(gomega.Succeed())

ac = testing.MakeAdmissionCheck("ac-prov").
ControllerName(provisioning.ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Parameters(kueue.GroupVersion.Group, "ProvisioningRequestConfig", prc.Name).
Obj()
gomega.Expect(k8sClient.Create(ctx, ac)).To(gomega.Succeed())
Expand Down Expand Up @@ -971,7 +971,7 @@ var _ = ginkgo.Describe("Provisioning", ginkgo.Ordered, ginkgo.ContinueOnFailure
gomega.Expect(k8sClient.Create(ctx, prc)).To(gomega.Succeed())

ac = testing.MakeAdmissionCheck("ac-prov").
ControllerName(provisioning.ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Parameters(kueue.GroupVersion.Group, "ProvisioningRequestConfig", prc.Name).
Obj()
gomega.Expect(k8sClient.Create(ctx, ac)).To(gomega.Succeed())
Expand Down Expand Up @@ -1214,7 +1214,7 @@ var _ = ginkgo.Describe("Provisioning", ginkgo.Ordered, ginkgo.ContinueOnFailure
gomega.Expect(k8sClient.Create(ctx, prc)).To(gomega.Succeed())

ac = testing.MakeAdmissionCheck("ac-prov").
ControllerName(provisioning.ControllerName).
ControllerName(kueue.ProvisioningRequestControllerName).
Parameters(kueue.GroupVersion.Group, "ProvisioningRequestConfig", prc.Name).
Obj()
gomega.Expect(k8sClient.Create(ctx, ac)).To(gomega.Succeed())
Expand Down
1 change: 1 addition & 0 deletions test/integration/tas/tas_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ var _ = ginkgo.Describe("Topology Aware Scheduling", ginkgo.Ordered, func() {
ResourceGroup(*testing.MakeFlavorQuotas(tasFlavor.Name).Resource(corev1.ResourceCPU, "5").Obj()).
Obj()
gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed())
util.ExpectClusterQueuesToBeActive(ctx, k8sClient, clusterQueue)

localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())
Expand Down
Loading

0 comments on commit 038a563

Please sign in to comment.