Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Oct 21, 2024
1 parent 2809d88 commit eafdc7f
Show file tree
Hide file tree
Showing 4 changed files with 596 additions and 99 deletions.
5 changes: 4 additions & 1 deletion pkg/controller/tas/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ const (

func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {
return indexer.IndexField(ctx, &corev1.Pod{}, workloadNameKey, func(o client.Object) []string {
pod := o.(*corev1.Pod)
pod, ok := o.(*corev1.Pod)
if !ok {
return nil
}
value, found := pod.Annotations[kueuealpha.WorkloadAnnotation]
if !found {
return nil
Expand Down
108 changes: 49 additions & 59 deletions pkg/controller/tas/topology_ungater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package tas

import (
"context"
"slices"
"time"

"github.com/go-logr/logr"
Expand All @@ -41,9 +42,12 @@ import (
"sigs.k8s.io/kueue/pkg/util/parallelize"
utilpod "sigs.k8s.io/kueue/pkg/util/pod"
utiltas "sigs.k8s.io/kueue/pkg/util/tas"
"sigs.k8s.io/kueue/pkg/workload"
)

const (
// batch reconciles in 1s intervals to avoid cascades of reconciles when
// a large Job creates its Pods.
ungateBatchPeriod = time.Second
)

Expand Down Expand Up @@ -85,63 +89,51 @@ type podHandler struct {
}

func (h *podHandler) Create(_ context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
pod, isPod := e.Object.(*corev1.Pod)
if !isPod {
return
}
h.queueReconcileForPod(pod, q)
h.queueReconcileForPod(e.Object, false, q)
}

func (h *podHandler) Update(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
oldPod, isOldPod := e.ObjectOld.(*corev1.Pod)
newPod, isNewPod := e.ObjectNew.(*corev1.Pod)
if !isOldPod || !isNewPod {
return
}
h.queueReconcileForPod(oldPod, q)
h.queueReconcileForPod(newPod, q)
func (h *podHandler) Update(_ context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.queueReconcileForPod(e.ObjectNew, false, q)
}

func (h *podHandler) Delete(_ context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
pod, isPod := e.Object.(*corev1.Pod)
if !isPod {
return
}
h.queueReconcileForPod(pod, q)
h.queueReconcileForPod(e.Object, true, q)
}

func (h *podHandler) queueReconcileForPod(pod *corev1.Pod, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
if pod == nil {
return
}
if !utilpod.HasGate(pod, kueuealpha.TopologySchedulingGate) {
func (h *podHandler) queueReconcileForPod(object client.Object, deleted bool, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
pod, isPod := object.(*corev1.Pod)
if !isPod {
return
}
if wlName, found := pod.Annotations[kueuealpha.WorkloadAnnotation]; found {
q.AddAfter(reconcile.Request{NamespacedName: types.NamespacedName{
key := types.NamespacedName{
Name: wlName,
Namespace: pod.Namespace,
}}, ungateBatchPeriod)
}
q.AddAfter(reconcile.Request{NamespacedName: key}, ungateBatchPeriod)
}
}

func (h *podHandler) Generic(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
}

func (r *topologyUngater) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := ctrl.LoggerFrom(ctx).WithValues("workload", req.NamespacedName.Name)
log := ctrl.LoggerFrom(ctx).WithValues("workload", req.NamespacedName.String())
log.V(2).Info("Reconcile Topology Ungater")

wl := &kueue.Workload{}
if err := r.client.Get(ctx, req.NamespacedName, wl); err != nil {
if client.IgnoreNotFound(err) != nil {
return reconcile.Result{}, err
}
log.Info("workload not found")
log.V(5).Info("workload not found")
return reconcile.Result{}, nil
}
if wl.Status.Admission == nil {
log.Info("workload is not admitted")
if !isWorkloadAdmittedByTAS(wl) {
// this is a safeguard. In particular, it helps to prevent the race
// condition if the workload is evicted before the reconcile is
// triggered.
log.V(5).Info("workload is not admitted by TAS")
return reconcile.Result{}, nil
}

Expand All @@ -153,14 +145,15 @@ func (r *topologyUngater) Reconcile(ctx context.Context, req reconcile.Request)
log.Error(err, "failed to identify pods to ungate", "podset", psa.Name, "count", psa.Count)
return reconcile.Result{}, err
} else {
log.Info("identified pods to ungate for podset", "podset", psa.Name, "count", len(toUngate))
log.V(2).Info("identified pods to ungate for podset", "podset", psa.Name, "count", len(toUngate))
allToUngate = append(allToUngate, toUngate...)
}
}
}
var err error
if len(allToUngate) > 0 {
log.V(2).Info("identified pods to ungate", "count", len(allToUngate))

err = parallelize.Until(ctx, len(allToUngate), func(i int) error {
podWithUngateInfo := &allToUngate[i]
e := utilclient.Patch(ctx, r.client, podWithUngateInfo.pod, true, func() (bool, error) {
Expand All @@ -175,6 +168,7 @@ func (r *topologyUngater) Reconcile(ctx context.Context, req reconcile.Request)
return true, nil
})
if e != nil {
// We won't observe this cleanup in the event handler.
log.Error(e, "failed ungating pod", "pod", klog.KObj(podWithUngateInfo.pod))
}
return e
Expand All @@ -187,44 +181,35 @@ func (r *topologyUngater) Reconcile(ctx context.Context, req reconcile.Request)
}

func (r *topologyUngater) Create(event event.CreateEvent) bool {
wl, isWl := event.Object.(*kueue.Workload)
if isWl {
return isTASWorkload(wl)
}
return true
return isWorkloadAdmittedByTAS(event.Object)
}

func (r *topologyUngater) Delete(event event.DeleteEvent) bool {
wl, isWl := event.Object.(*kueue.Workload)
if isWl {
return isTASWorkload(wl)
}
return true
return isWorkloadAdmittedByTAS(event.Object)
}

func (r *topologyUngater) Update(event event.UpdateEvent) bool {
_, isOldWl := event.ObjectOld.(*kueue.Workload)
newWl, isNewWl := event.ObjectNew.(*kueue.Workload)
if isOldWl && isNewWl {
return isTASWorkload(newWl)
}
return true
return isWorkloadAdmittedByTAS(event.ObjectNew)
}

func isTASWorkload(wl *kueue.Workload) bool {
if wl.Status.Admission == nil {
func (r *topologyUngater) Generic(event event.GenericEvent) bool {
return false
}

func isWorkloadAdmittedByTAS(object client.Object) bool {
wl, isWl := object.(*kueue.Workload)
if !isWl {
return false
}
for _, psa := range wl.Status.Admission.PodSetAssignments {
if psa.TopologyAssignment != nil {
return true
}
}
return false
return isAdmittedByTAS(wl)
}

func (r *topologyUngater) Generic(event event.GenericEvent) bool {
return false
func isAdmittedByTAS(w *kueue.Workload) bool {
return w.Status.Admission != nil && workload.IsAdmitted(w) &&
slices.ContainsFunc(w.Status.Admission.PodSetAssignments,
func(psa kueue.PodSetAssignment) bool {
return psa.TopologyAssignment != nil
})
}

func (r *topologyUngater) podsetPodsToUngate(ctx context.Context, log logr.Logger, wl *kueue.Workload, psa *kueue.PodSetAssignment) ([]podWithUngateInfo, error) {
Expand Down Expand Up @@ -292,9 +277,14 @@ func (r *topologyUngater) podsForDomain(ctx context.Context, ns, wlName, psName
}); err != nil {
return nil, err
}
result := make([]*corev1.Pod, 0)
for _, p := range pods.Items {
result = append(result, &p)
result := make([]*corev1.Pod, 0, len(pods.Items))
for i := range pods.Items {
if pods.Items[i].Status.Phase == corev1.PodFailed {
// ignore failed pods as they need to be replaced, and so we don't
// want to count them as already ungated Pods.
continue
}
result = append(result, &pods.Items[i])
}
return result, nil
}
Loading

0 comments on commit eafdc7f

Please sign in to comment.