Skip to content

Commit

Permalink
Merge pull request kubernetes#514 from alculquicondor/preemption
Browse files Browse the repository at this point in the history
Preemption within ClusterQueue and cohort
  • Loading branch information
k8s-ci-robot authored Jan 30, 2023
2 parents a797233 + ba9dcdf commit c2d2dc6
Show file tree
Hide file tree
Showing 20 changed files with 2,005 additions and 49 deletions.
57 changes: 57 additions & 0 deletions apis/kueue/v1alpha2/clusterqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,23 @@ type ClusterQueueSpec struct {
// Defaults to null which is a nothing selector (no namespaces eligible).
// If set to an empty selector `{}`, then all namespaces are eligible.
NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty"`

// preemption describes policies to preempt Workloads from this ClusterQueue
// or the ClusterQueue's cohort.
//
// Preemption can happen in two scenarios:
//
// - When a Workload fits within the min quota of the ClusterQueue, but the
// quota is currently borrowed by other ClusterQueues in the cohort.
// Preempting Workloads in other ClusterQueues allows this ClusterQueue to
// reclaim its min quota.
// - When a Workload doesn't fit within the min quota of the ClusterQueue
// and there are active Workloads with lower priority.
//
// The preemption algorithm tries to find a minimal set of Workloads to
// preempt to accomodate the pending Workload, preempting Workloads with
// lower priority first.
Preemption *ClusterQueuePreemption `json:"preemption,omitempty"`
}

type QueueingStrategy string
Expand Down Expand Up @@ -272,6 +289,46 @@ type Usage struct {
Borrowed *resource.Quantity `json:"borrowing,omitempty"`
}

type PreemptionPolicy string

const (
PreemptionPolicyNever PreemptionPolicy = "Never"
PreemptionPolicyAny PreemptionPolicy = "Any"
PreemptionPolicyLowerPriority PreemptionPolicy = "LowerPriority"
)

// ClusterQueuePreemption contains policies to preempt Workloads from this
// ClusterQueue or the ClusterQueue's cohort.
type ClusterQueuePreemption struct {
// reclaimWithinCohort determines whether a pending Workload can preempt
// Workloads from other ClusterQueues in the cohort that are using more than
// their min quota. Possible values are:
//
// - `Never` (default): do not preempt workloads in the cohort.
// - `LowerPriority`: if the pending workload fits within the min
// quota of its ClusterQueue, only preempt workloads in the cohort that have
// lower priority than the pending Workload.
// - `Any`: if the pending workload fits within the min quota of its
// ClusterQueue, preempt any workload in the cohort, irrespective of
// priority.
//
// +kubebuilder:default=Never
// +kubebuilder:validation:Enum=Never;LowerPriority;Any
ReclaimWithinCohort PreemptionPolicy `json:"withinCohort,omitempty"`

// withinClusterQueue determines whether a pending workload that doesn't fit
// within the min quota for its ClusterQueue, can preempt active Workloads in
// the ClusterQueue. Possible values are:
//
// - `Never` (default): do not preempt workloads in the ClusterQueue.
// - `LowerPriority`: only preempt workloads in the ClusterQueue that have
// lower priority than the pending Workload.
//
// +kubebuilder:default=Never
// +kubebuilder:validation:Enum=Never;LowerPriority
WithinClusterQueue PreemptionPolicy `json:"withinClusterQueue,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:resource:scope=Cluster,shortName={cq}
//+kubebuilder:subresource:status
Expand Down
20 changes: 20 additions & 0 deletions apis/kueue/v1alpha2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions apis/kueue/webhooks/clusterqueue_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func (w *ClusterQueueWebhook) Default(ctx context.Context, obj runtime.Object) e
if !controllerutil.ContainsFinalizer(cq, kueue.ResourceInUseFinalizerName) {
controllerutil.AddFinalizer(cq, kueue.ResourceInUseFinalizerName)
}
if cq.Spec.Preemption == nil {
cq.Spec.Preemption = &kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyNever,
ReclaimWithinCohort: kueue.PreemptionPolicyNever,
}
}
return nil
}

Expand Down
43 changes: 43 additions & 0 deletions config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,49 @@ spec:
type: object
type: object
x-kubernetes-map-type: atomic
preemption:
description: "preemption describes policies to preempt Workloads from
this ClusterQueue or the ClusterQueue's cohort. \n Preemption can
happen in two scenarios: \n - When a Workload fits within the min
quota of the ClusterQueue, but the quota is currently borrowed by
other ClusterQueues in the cohort. Preempting Workloads in other
ClusterQueues allows this ClusterQueue to reclaim its min quota.
- When a Workload doesn't fit within the min quota of the ClusterQueue
and there are active Workloads with lower priority. \n The preemption
algorithm tries to find a minimal set of Workloads to preempt to
accomodate the pending Workload, preempting Workloads with lower
priority first."
properties:
withinClusterQueue:
default: Never
description: "withinClusterQueue determines whether a pending
workload that doesn't fit within the min quota for its ClusterQueue,
can preempt active Workloads in the ClusterQueue. Possible values
are: \n - `Never` (default): do not preempt workloads in the
ClusterQueue. - `LowerPriority`: only preempt workloads in the
ClusterQueue that have lower priority than the pending Workload."
enum:
- Never
- LowerPriority
type: string
withinCohort:
default: Never
description: "reclaimWithinCohort determines whether a pending
Workload can preempt Workloads from other ClusterQueues in the
cohort that are using more than their min quota. Possible values
are: \n - `Never` (default): do not preempt workloads in the
cohort. - `LowerPriority`: if the pending workload fits within
the min quota of its ClusterQueue, only preempt workloads in
the cohort that have lower priority than the pending Workload.
- `Any`: if the pending workload fits within the min quota of
its ClusterQueue, preempt any workload in the cohort, irrespective
of priority."
enum:
- Never
- LowerPriority
- Any
type: string
type: object
queueingStrategy:
default: BestEffortFIFO
description: "QueueingStrategy indicates the queueing strategy of
Expand Down
27 changes: 22 additions & 5 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type ClusterQueue struct {
Workloads map[string]*workload.Info
WorkloadsNotReady sets.Set[string]
NamespaceSelector labels.Selector
Preemption kueue.ClusterQueuePreemption
// The set of key labels from all flavors of a resource.
// Those keys define the affinity terms of a workload
// that can be matched against the flavors.
Expand Down Expand Up @@ -250,6 +251,11 @@ func (c *ClusterQueue) Active() bool {
return c.Status == active
}

var defaultPreemption = kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyNever,
WithinClusterQueue: kueue.PreemptionPolicyNever,
}

func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[string]*kueue.ResourceFlavor) error {
c.RequestableResources = resourcesByName(in.Spec.Resources)
c.UpdateCodependentResources()
Expand All @@ -274,6 +280,13 @@ func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[string
}
c.UsedResources = usedResources
c.UpdateWithFlavors(resourceFlavors)

if in.Spec.Preemption != nil {
c.Preemption = *in.Spec.Preemption
} else {
c.Preemption = defaultPreemption
}

return nil
}

Expand Down Expand Up @@ -372,21 +385,25 @@ func (c *ClusterQueue) deleteWorkload(w *kueue.Workload) {
}

func (c *ClusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
updateUsage(wi, c.UsedResources, m)
qKey := workload.QueueKey(wi.Obj)
if _, ok := c.admittedWorkloadsPerQueue[qKey]; ok {
c.admittedWorkloadsPerQueue[qKey] += int(m)
}
}

func updateUsage(wi *workload.Info, usedResources ResourceQuantities, m int64) {
for _, ps := range wi.TotalRequests {
for wlRes, wlResFlv := range ps.Flavors {
v, wlResExist := ps.Requests[wlRes]
cqResFlv, cqResExist := c.UsedResources[wlRes]
cqResFlv, cqResExist := usedResources[wlRes]
if cqResExist && wlResExist {
if _, cqFlvExist := cqResFlv[wlResFlv]; cqFlvExist {
cqResFlv[wlResFlv] += v * m
}
}
}
}
qKey := workload.QueueKey(wi.Obj)
if _, ok := c.admittedWorkloadsPerQueue[qKey]; ok {
c.admittedWorkloadsPerQueue[qKey] += int(m)
}
}

func (c *ClusterQueue) addLocalQueue(q *kueue.LocalQueue) error {
Expand Down
Loading

0 comments on commit c2d2dc6

Please sign in to comment.