Skip to content

Commit

Permalink
Plain pod groups implementation
Browse files Browse the repository at this point in the history
* Introduce new ComposableJob interface for jobs which
  has to be composed of different API objects.

* Add custom get. A composable job can get all it's elements at the
  beginning of the reconcile.

* Add ComposableJob implementation for pod groups.

* Add webhook checks for pod group labels and
  annotations.

* Update Finished method for pod group

* IsSuspended and Stop methods of the pod controller now
  interact with all the pods at once.

* Update IsActive function to check if at least one pod in
  the group is running.

* Change podSuspended method.

* Add stop skip for pods in group that already have a
  delition timestamp.

* Add IsComposableJobActive

* Add UnretryableError error, that doesn't require reconcile
  retry.

* Add ValidateLabelAsCRDName call for the pod-group,
  make pod-group label immutable.

* Add unit tests for pod group integration
  • Loading branch information
achernevskii committed Nov 24, 2023
1 parent 1ecd79f commit c264f4c
Show file tree
Hide file tree
Showing 15 changed files with 2,797 additions and 526 deletions.
22 changes: 22 additions & 0 deletions pkg/controller/jobframework/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package jobframework

import "errors"

// UnretryableError is an error that doesn't require reconcile retry
// and will not be returned by the JobReconciler.
func UnretryableError(msg string) error {
return &unretryableError{msg: msg}
}

type unretryableError struct {
msg string
}

func (e *unretryableError) Error() string {
return e.msg
}

func IsUnretryableError(e error) bool {
var unretryableError *unretryableError
return errors.As(e, &unretryableError)
}
16 changes: 8 additions & 8 deletions pkg/controller/jobframework/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package jobframework
import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -27,15 +26,16 @@ func SetupWorkloadOwnerIndex(ctx context.Context, indexer client.FieldIndexer, g
return indexer.IndexField(ctx, &kueue.Workload{}, getOwnerKey(gvk), func(o client.Object) []string {
// grab the Workload object, extract the owner...
wl := o.(*kueue.Workload)
owner := metav1.GetControllerOf(wl)
if owner == nil {
if len(wl.OwnerReferences) == 0 {
return nil
}
// ...make sure it's the job with matching GVK...
if owner.Kind != gvk.Kind || owner.APIVersion != gvk.GroupVersion().String() {
return nil
owners := make([]string, 0, len(wl.OwnerReferences))
for i := range wl.OwnerReferences {
owner := &wl.OwnerReferences[i]
if owner.Kind == gvk.Kind && owner.APIVersion == gvk.GroupVersion().String() {
owners = append(owners, owner.Name)
}
}
// ...and if so, return it
return []string{owner.Name}
return owners
})
}
13 changes: 13 additions & 0 deletions pkg/controller/jobframework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
Expand Down Expand Up @@ -84,6 +86,17 @@ type JobWithPriorityClass interface {
PriorityClass() string
}

// ComposableJob interface should be implemented by generic jobs that
// are composed out of multiple API objects.
type ComposableJob interface {
// Load loads all members of the composable job. If removeFinalizers == true, workload and job finalizers should be removed.
Load(ctx context.Context, c client.Client, key types.NamespacedName) (removeFinalizers bool, err error)
// ConstructComposableWorkload returns a new Workload that's assembled out of all members of the ComposableJob.
ConstructComposableWorkload(ctx context.Context, c client.Client, r record.EventRecorder) (*kueue.Workload, error)
// FindMatchingWorkloads returns all related workloads, workload that matches the ComposableJob and duplicates that has to be deleted.
FindMatchingWorkloads(ctx context.Context, c client.Client) (match *kueue.Workload, toDelete []*kueue.Workload, err error)
}

func ParentWorkloadName(job GenericJob) string {
return job.Object().GetAnnotations()[constants.ParentWorkloadAnnotation]
}
Expand Down
144 changes: 106 additions & 38 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -27,6 +28,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -132,20 +134,31 @@ func NewReconciler(
}
}

func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Request, job GenericJob) (ctrl.Result, error) {
func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Request, job GenericJob) (result ctrl.Result, err error) {
object := job.Object()
log := ctrl.LoggerFrom(ctx).WithValues("job", req.String(), "gvk", job.GVK())
ctx = ctrl.LoggerInto(ctx, log)

err := r.client.Get(ctx, req.NamespacedName, object)
defer func() {
err = r.ignoreUnretryableError(log, err)
}()

dropFinalizers := false
if cJob, isComposable := job.(ComposableJob); isComposable {
dropFinalizers, err = cJob.Load(ctx, r.client, req.NamespacedName)
} else {
err = r.client.Get(ctx, req.NamespacedName, object)
dropFinalizers = apierrors.IsNotFound(err) || !object.GetDeletionTimestamp().IsZero()
}

if jws, implements := job.(JobWithSkip); implements {
if jws.Skip() {
return ctrl.Result{}, nil
}
}

if apierrors.IsNotFound(err) || !object.GetDeletionTimestamp().IsZero() {
if dropFinalizers {
// Remove workload finalizer
workloads := kueue.WorkloadList{}
if err := r.client.List(ctx, &workloads, client.InNamespace(req.Namespace),
client.MatchingFields{getOwnerKey(job.GVK()): req.Name}); err != nil {
Expand All @@ -159,6 +172,8 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}
}

// Remove job finalizer
if !object.GetDeletionTimestamp().IsZero() {
if err = r.finalizeJob(ctx, job); err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -309,8 +324,8 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, fmt.Errorf("clearing admission: %w", err)
}
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}

// 7. handle job is suspended.
Expand Down Expand Up @@ -402,19 +417,19 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
// Find a matching workload first if there is one.
var toDelete []*kueue.Workload
var match *kueue.Workload
var workloads kueue.WorkloadList
if err := r.client.List(ctx, &workloads, client.InNamespace(object.GetNamespace()),
client.MatchingFields{getOwnerKey(job.GVK()): object.GetName()}); err != nil {
log.Error(err, "Unable to list child workloads")
return nil, err
}

for i := range workloads.Items {
w := &workloads.Items[i]
if match == nil && r.equivalentToWorkload(job, object, w) {
match = w
} else {
toDelete = append(toDelete, w)
if cj, implements := job.(ComposableJob); implements {
var err error
match, toDelete, err = cj.FindMatchingWorkloads(ctx, r.client)
if err != nil {
log.Error(err, "Composable job is unable to find matching workloads")
return nil, err
}
} else {
var err error
match, toDelete, err = FindMatchingWorkloads(ctx, r.client, job)
if err != nil {
log.Error(err, "Unable to list child workloads")
return nil, err
}
}

Expand All @@ -428,11 +443,11 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
if match == nil && !job.IsSuspended() {
log.V(2).Info("job with no matching workload, suspending")
var w *kueue.Workload
if len(workloads.Items) == 1 {
if len(toDelete) == 1 {
// The job may have been modified and hence the existing workload
// doesn't match the job anymore. All bets are off if there are more
// than one workload...
w = &workloads.Items[0]
w = toDelete[0]
}

if _, finished := job.Finished(); finished {
Expand Down Expand Up @@ -480,16 +495,37 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
return match, nil
}

func FindMatchingWorkloads(ctx context.Context, c client.Client, job GenericJob) (match *kueue.Workload, toDelete []*kueue.Workload, err error) {
object := job.Object()

workloads := &kueue.WorkloadList{}
if err := c.List(ctx, workloads, client.InNamespace(object.GetNamespace()),
client.MatchingFields{getOwnerKey(job.GVK()): object.GetName()}); err != nil {
return nil, nil, err
}

for i := range workloads.Items {
w := &workloads.Items[i]
if match == nil && equivalentToWorkload(job, w) {
match = w
} else {
toDelete = append(toDelete, w)
}
}

return match, toDelete, nil
}

// equivalentToWorkload checks if the job corresponds to the workload
func (r *JobReconciler) equivalentToWorkload(job GenericJob, object client.Object, wl *kueue.Workload) bool {
func equivalentToWorkload(job GenericJob, wl *kueue.Workload) bool {
owner := metav1.GetControllerOf(wl)
// Indexes don't work in unit tests, so we explicitly check for the
// owner here.
if owner.Name != object.GetName() {
if owner.Name != job.Object().GetName() {
return false
}

jobPodSets := resetMinCounts(job.PodSets())
jobPodSets := clearMinCountsIfFeatureDisabled(job.PodSets())

if !workload.CanBePartiallyAdmitted(wl) || !workload.HasQuotaReservation(wl) {
// the two sets should fully match.
Expand Down Expand Up @@ -525,6 +561,10 @@ func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job Generi
if err != nil {
return nil, fmt.Errorf("can't construct workload for update: %w", err)
}
err = r.prepareWorkload(ctx, job, newWl)
if err != nil {
return nil, fmt.Errorf("can't construct workload for update: %w", err)
}
wl.Spec = newWl.Spec
if err = r.client.Update(ctx, wl); err != nil {
return nil, fmt.Errorf("updating existed workload: %w", err)
Expand Down Expand Up @@ -558,7 +598,7 @@ func (r *JobReconciler) startJob(ctx context.Context, job GenericJob, object cli
// stopJob will suspend the job, and also restore node affinity, reset job status if needed.
// Returns whether any operation was done to stop the job or an error.
func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload, eventMsg string) error {
info := getPodSetsInfoFromWorkload(wl)
info := GetPodSetsInfoFromWorkload(wl)

if jws, implements := job.(JobWithCustomStop); implements {
stoppedNow, err := jws.Stop(ctx, r.client, info, eventMsg)
Expand Down Expand Up @@ -605,6 +645,15 @@ func (r *JobReconciler) removeFinalizer(ctx context.Context, wl *kueue.Workload)
func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, object client.Object) (*kueue.Workload, error) {
log := ctrl.LoggerFrom(ctx)

if cj, implements := job.(ComposableJob); implements {
wl, err := cj.ConstructComposableWorkload(ctx, r.client, r.record)
if err != nil {
return nil, err
}

return wl, nil
}

podSets := job.PodSets()

wl := &kueue.Workload{
Expand All @@ -615,7 +664,7 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o
Finalizers: []string{kueue.ResourceInUseFinalizerName},
},
Spec: kueue.WorkloadSpec{
PodSets: resetMinCounts(podSets),
PodSets: podSets,
QueueName: QueueName(job),
},
}
Expand All @@ -631,19 +680,26 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o
)
}

priorityClassName, source, p, err := r.extractPriority(ctx, podSets, job)
if err != nil {
if err := ctrl.SetControllerReference(object, wl, r.client.Scheme()); err != nil {
return nil, err
}
return wl, nil
}

// prepareWorkload adds the priority information for the constructed workload
func (r *JobReconciler) prepareWorkload(ctx context.Context, job GenericJob, wl *kueue.Workload) error {
priorityClassName, source, p, err := r.extractPriority(ctx, wl.Spec.PodSets, job)
if err != nil {
return err
}

wl.Spec.PriorityClassName = priorityClassName
wl.Spec.Priority = &p
wl.Spec.PriorityClassSource = source

if err := ctrl.SetControllerReference(object, wl, r.client.Scheme()); err != nil {
return nil, err
}
return wl, nil
wl.Spec.PodSets = clearMinCountsIfFeatureDisabled(wl.Spec.PodSets)

return nil
}

func (r *JobReconciler) extractPriority(ctx context.Context, podSets []kueue.PodSet, job GenericJob) (string, string, int32, error) {
Expand Down Expand Up @@ -711,6 +767,10 @@ func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job Generic
if err != nil {
return err
}
err = r.prepareWorkload(ctx, job, wl)
if err != nil {
return err
}
if err = r.client.Create(ctx, wl); err != nil {
return err
}
Expand All @@ -719,6 +779,14 @@ func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job Generic
return nil
}

func (r *JobReconciler) ignoreUnretryableError(log logr.Logger, err error) error {
if IsUnretryableError(err) {
log.V(2).Info("Received an unretryable error", "error", err)
return nil
}
return err
}

func generatePodsReadyCondition(job GenericJob, wl *kueue.Workload) metav1.Condition {
conditionStatus := metav1.ConditionFalse
message := "Not all pods are ready or succeeded"
Expand All @@ -739,9 +807,9 @@ func generatePodsReadyCondition(job GenericJob, wl *kueue.Workload) metav1.Condi
}
}

// getPodSetsInfoFromWorkload retrieve the podSetsInfo slice from the
// GetPodSetsInfoFromWorkload retrieve the podSetsInfo slice from the
// provided workload's spec
func getPodSetsInfoFromWorkload(wl *kueue.Workload) []podset.PodSetInfo {
func GetPodSetsInfoFromWorkload(wl *kueue.Workload) []podset.PodSetInfo {
if wl == nil {
return nil
}
Expand Down Expand Up @@ -774,17 +842,17 @@ func (r *genericReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

func (r *genericReconciler) SetupWithManager(mgr ctrl.Manager) error {
builder := ctrl.NewControllerManagedBy(mgr).
b := ctrl.NewControllerManagedBy(mgr).
For(r.newJob().Object()).
Owns(&kueue.Workload{})
Owns(&kueue.Workload{}, builder.MatchEveryOwner)
if r.newWorkloadHandler != nil {
builder = builder.Watches(&kueue.Workload{}, r.newWorkloadHandler(mgr.GetClient()))
b = b.Watches(&kueue.Workload{}, r.newWorkloadHandler(mgr.GetClient()))
}
return builder.Complete(r)
return b.Complete(r)
}

// resets the minCount for all podSets if the PartialAdmission feature is not enabled
func resetMinCounts(in []kueue.PodSet) []kueue.PodSet {
// clearMinCountsIfFeatureDisabled sets the minCount for all podSets to nil if the PartialAdmission feature is not enabled
func clearMinCountsIfFeatureDisabled(in []kueue.PodSet) []kueue.PodSet {
if features.Enabled(features.PartialAdmission) || len(in) == 0 {
return in
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/jobframework/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (

func ValidateCreateForQueueName(job GenericJob) field.ErrorList {
var allErrs field.ErrorList
allErrs = append(allErrs, validateLabelAsCRDName(job, constants.QueueLabel)...)
allErrs = append(allErrs, ValidateLabelAsCRDName(job, constants.QueueLabel)...)
allErrs = append(allErrs, ValidateAnnotationAsCRDName(job, constants.QueueAnnotation)...)
return allErrs
}
Expand All @@ -48,7 +48,7 @@ func ValidateAnnotationAsCRDName(job GenericJob, crdNameAnnotation string) field
return allErrs
}

func validateLabelAsCRDName(job GenericJob, crdNameLabel string) field.ErrorList {
func ValidateLabelAsCRDName(job GenericJob, crdNameLabel string) field.ErrorList {
var allErrs field.ErrorList
if value, exists := job.Object().GetLabels()[crdNameLabel]; exists {
if errs := validation.IsDNS1123Subdomain(value); len(errs) > 0 {
Expand Down
Loading

0 comments on commit c264f4c

Please sign in to comment.