Skip to content

Commit

Permalink
Update pod group support
Browse files Browse the repository at this point in the history
* Add new unit/integration tests.

* Update IsActive function to check if at least one pods in
  the group is running.

* Update Stop method to skip pod group stop, if wl is
  evicted and pods are all failed/suspended, or at least one
  of the pods is running.

* Change the reasons for workload/pod group finalization in
  the Load method.
  • Loading branch information
achernevskii committed Nov 24, 2023
1 parent 302f895 commit 0d878c0
Show file tree
Hide file tree
Showing 8 changed files with 435 additions and 37 deletions.
6 changes: 3 additions & 3 deletions pkg/controller/jobframework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type JobWithCustomStop interface {
// Stop implements a custom stop procedure.
// The function should be idempotent: not do any API calls if the job is already stopped.
// Returns whether the Job stopped with this call or an error
Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, eventMsg string) (bool, error)
Stop(ctx context.Context, c client.Client, wl *kueue.Workload, eventMsg string) (bool, error)
}

// JobWithFinalize interface should be implemented by generic jobs,
Expand All @@ -89,8 +89,8 @@ type JobWithPriorityClass interface {
// ComposableJob interface should be implemented by generic jobs that
// are composed out of multiple API objects.
type ComposableJob interface {
// Load loads all members of the composable job
Load(ctx context.Context, c client.Client, key types.NamespacedName) (removeWlFinalizer bool, err error)
// Load loads all members of the composable job. If removeFinalizers == true, workload and job finalizers should be removed.
Load(ctx context.Context, c client.Client, key types.NamespacedName) (removeFinalizers bool, err error)
// ConstructComposableWorkload returns a new Workload that's assembled out of all members of the ComposableJob.
ConstructComposableWorkload(ctx context.Context, c client.Client, r record.EventRecorder) (*kueue.Workload, error)
// FindMatchingWorkloads returns all related workloads, workload that matches the ComposableJob and duplicates that has to be deleted.
Expand Down
20 changes: 11 additions & 9 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,12 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
err = r.ignoreUnretryableError(log, err)
}()

dropWorloadFinalizer := false
dropFinalizers := false
if cJob, isComposable := job.(ComposableJob); isComposable {
dropWorloadFinalizer, err = cJob.Load(ctx, r.client, req.NamespacedName)
dropFinalizers, err = cJob.Load(ctx, r.client, req.NamespacedName)
} else {
err = r.client.Get(ctx, req.NamespacedName, object)
dropWorloadFinalizer = apierrors.IsNotFound(err) || !object.GetDeletionTimestamp().IsZero()
dropFinalizers = apierrors.IsNotFound(err) || !object.GetDeletionTimestamp().IsZero()
}

if jws, implements := job.(JobWithSkip); implements {
Expand All @@ -157,7 +157,8 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}
}

if dropWorloadFinalizer {
if dropFinalizers {
// Remove workload finalizer
workloads := kueue.WorkloadList{}
if err := r.client.List(ctx, &workloads, client.InNamespace(req.Namespace),
client.MatchingFields{getOwnerKey(job.GVK()): req.Name}); err != nil {
Expand All @@ -171,6 +172,8 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}
}

// Remove job finalizer
if !object.GetDeletionTimestamp().IsZero() {
if err = r.finalizeJob(ctx, job); err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -595,10 +598,8 @@ func (r *JobReconciler) startJob(ctx context.Context, job GenericJob, object cli
// stopJob will suspend the job, and also restore node affinity, reset job status if needed.
// Returns whether any operation was done to stop the job or an error.
func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload, eventMsg string) error {
info := getPodSetsInfoFromWorkload(wl)

if jws, implements := job.(JobWithCustomStop); implements {
stoppedNow, err := jws.Stop(ctx, r.client, info, eventMsg)
stoppedNow, err := jws.Stop(ctx, r.client, wl, eventMsg)
if stoppedNow {
r.record.Eventf(object, corev1.EventTypeNormal, "Stopped", eventMsg)
}
Expand All @@ -609,6 +610,7 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, object clie
return nil
}

info := GetPodSetsInfoFromWorkload(wl)
job.Suspend()
if info != nil {
job.RestorePodSetsInfo(info)
Expand Down Expand Up @@ -804,9 +806,9 @@ func generatePodsReadyCondition(job GenericJob, wl *kueue.Workload) metav1.Condi
}
}

// getPodSetsInfoFromWorkload retrieve the podSetsInfo slice from the
// GetPodSetsInfoFromWorkload retrieve the podSetsInfo slice from the
// provided workload's spec
func getPodSetsInfoFromWorkload(wl *kueue.Workload) []podset.PodSetInfo {
func GetPodSetsInfoFromWorkload(wl *kueue.Workload) []podset.PodSetInfo {
if wl == nil {
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ func (j *Job) Suspend() {
j.Spec.Suspend = ptr.To(true)
}

func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, eventMsg string) (bool, error) {
func (j *Job) Stop(ctx context.Context, c client.Client, wl *kueue.Workload, eventMsg string) (bool, error) {
podSetsInfo := jobframework.GetPodSetsInfoFromWorkload(wl)

stoppedNow := false
if !j.IsSuspended() {
j.Suspend()
Expand Down
86 changes: 73 additions & 13 deletions pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ const (
FrameworkName = "pod"
gateNotFound = -1
ConditionTypeTerminationTarget = "TerminationTarget"
errMsgIncorrectTotalGroupCount = "group total count is different from the actual number of pods in the cluster"
errMsgIncorrectGroupRoleCount = "pod group can't include more than 8 roles"
)

Expand Down Expand Up @@ -219,7 +218,12 @@ func (p *Pod) PodSets() []kueue.PodSet {

// IsActive returns true if there are any running pods.
func (p *Pod) IsActive() bool {
return p.pod.Status.Phase == corev1.PodRunning
for i := range p.list.Items {
if p.list.Items[i].Status.Phase == corev1.PodRunning {
return true
}
}
return false
}

func hasPodReadyTrue(conds []corev1.PodCondition) bool {
Expand Down Expand Up @@ -251,7 +255,7 @@ func (p *Pod) GVK() schema.GroupVersionKind {
return gvk
}

func (p *Pod) Stop(ctx context.Context, c client.Client, _ []podset.PodSetInfo, eventMsg string) (bool, error) {
func (p *Pod) Stop(ctx context.Context, c client.Client, wl *kueue.Workload, eventMsg string) (bool, error) {
var podsInGroup []corev1.Pod

if p.isGroup {
Expand All @@ -260,6 +264,32 @@ func (p *Pod) Stop(ctx context.Context, c client.Client, _ []podset.PodSetInfo,
podsInGroup = []corev1.Pod{p.pod}
}

if wl != nil && p.isGroup {
if evCond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadEvicted); evCond != nil && evCond.Status == metav1.ConditionTrue {
allPodsFailed := true
for _, pod := range p.list.Items {
if pod.Status.Phase != corev1.PodFailed {
allPodsFailed = false
break
}
}

// If all pods in the group failed or scheduling gated,
// consider the group stopped and clear wl admission
if allPodsFailed || p.IsSuspended() {
return false, nil
}

// If at least one of the pods in group is running,
// return UnretryableError and keep wl admission
if p.IsActive() {
return false, jobframework.UnretryableError(
fmt.Sprintf("Waiting for evicted pod job '%s.%s' to finish", p.pod.Namespace, p.pod.Name),
)
}
}
}

for i := range podsInGroup {
podInGroup := fromObject(&podsInGroup[i])

Expand Down Expand Up @@ -296,6 +326,25 @@ func (p *Pod) Stop(ctx context.Context, c client.Client, _ []podset.PodSetInfo,
return true, nil
}

// isStopped will return true if all pods in the group has been stopped
func (p *Pod) isStopped() bool {
var podsInGroup []corev1.Pod

if p.isGroup {
podsInGroup = p.list.Items
} else {
podsInGroup = []corev1.Pod{p.pod}
}

for i := range podsInGroup {
if podsInGroup[i].ObjectMeta.DeletionTimestamp.IsZero() {
return false
}
}

return true
}

func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {
return jobframework.SetupWorkloadOwnerIndex(ctx, indexer, gvk)
}
Expand Down Expand Up @@ -362,7 +411,7 @@ func (p *Pod) groupTotalCount() (int, error) {
}

// Load loads all pods in the group
func (p *Pod) Load(ctx context.Context, c client.Client, key types.NamespacedName) (removeWlFinalizer bool, err error) {
func (p *Pod) Load(ctx context.Context, c client.Client, key types.NamespacedName) (removeFinalizers bool, err error) {
if err := c.Get(ctx, key, &p.pod); err != nil {
return apierrors.IsNotFound(err), err
}
Expand All @@ -378,13 +427,23 @@ func (p *Pod) Load(ctx context.Context, c client.Client, key types.NamespacedNam
return false, err
}

// remove wl finalizers if all the pods have a deletion timestamp
for i := range p.list.Items {
if p.list.Items[i].DeletionTimestamp.IsZero() {
return false, nil
wl, _, err := p.FindMatchingWorkloads(ctx, c)
if err != nil {
return false, err
}

if wl != nil {
// If workload is finished remove its finalizer
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
return true, nil
}
} else {
if p.isStopped() {
return true, nil
}
}
return true, nil

return false, nil
}

func (p *Pod) constructGroupPodSets(podsInGroup corev1.PodList) ([]kueue.PodSet, error) {
Expand Down Expand Up @@ -470,8 +529,9 @@ func (p *Pod) ConstructComposableWorkload(ctx context.Context, c client.Client,
}

if len(p.list.Items) != groupTotalCount {
r.Eventf(object, corev1.EventTypeWarning, "ErrWorkloadCompose", errMsgIncorrectTotalGroupCount)
return nil, jobframework.UnretryableError(errMsgIncorrectTotalGroupCount)
errMsg := fmt.Sprintf("'%s' group total count is different from the actual number of pods in the cluster", p.groupName())
r.Eventf(object, corev1.EventTypeWarning, "ErrWorkloadCompose", errMsg)
return nil, jobframework.UnretryableError(errMsg)
}

podSets, err = p.constructGroupPodSets(p.list)
Expand Down Expand Up @@ -499,7 +559,7 @@ func (p *Pod) ConstructComposableWorkload(ctx context.Context, c client.Client,
},
}

if p.groupName() == "" {
if !p.isGroup {
wl.Name = jobframework.GetWorkloadNameForOwnerWithGVK(p.pod.GetName(), p.GVK())
jobUid := string(p.Object().GetUID())
if errs := validation.IsValidLabelValue(jobUid); len(errs) == 0 {
Expand All @@ -512,7 +572,7 @@ func (p *Pod) ConstructComposableWorkload(ctx context.Context, c client.Client,
)
}

// add the owner ref
// add the controller ref
if err := controllerutil.SetControllerReference(p.Object(), wl, c.Scheme()); err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 0d878c0

Please sign in to comment.