diff --git a/pkg/controller/jobs/pod/event_handlers.go b/pkg/controller/jobs/pod/event_handlers.go index 62b02826b9..1bf5cc8403 100644 --- a/pkg/controller/jobs/pod/event_handlers.go +++ b/pkg/controller/jobs/pod/event_handlers.go @@ -34,6 +34,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/util/expectations" ) var ( @@ -62,7 +63,7 @@ func reconcileRequestForPod(p *corev1.Pod) reconcile.Request { // podEventHandler will convert reconcile requests for pods in group from "/" to // "group//". type podEventHandler struct { - cleanedUpPodsExpectations *expectationsStore + cleanedUpPodsExpectations *expectations.Store } func (h *podEventHandler) Create(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index 0ee17b65e1..0709a494d3 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -53,6 +53,7 @@ import ( "sigs.k8s.io/kueue/pkg/podset" "sigs.k8s.io/kueue/pkg/util/admissioncheck" clientutil "sigs.k8s.io/kueue/pkg/util/client" + "sigs.k8s.io/kueue/pkg/util/expectations" "sigs.k8s.io/kueue/pkg/util/kubeversion" "sigs.k8s.io/kueue/pkg/util/maps" "sigs.k8s.io/kueue/pkg/util/parallelize" @@ -115,7 +116,7 @@ func init() { type Reconciler struct { *jobframework.JobReconciler - expectationsStore *expectationsStore + expectationsStore *expectations.Store } func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -142,7 +143,7 @@ func NewJob() jobframework.GenericJob { func NewReconciler(c client.Client, record record.EventRecorder, opts ...jobframework.Option) jobframework.JobReconcilerInterface { return &Reconciler{ JobReconciler: jobframework.NewReconciler(c, record, opts...), - expectationsStore: newUIDExpectations("finalizedPods"), + expectationsStore: expectations.NewStore("finalizedPods"), } } @@ -154,7 +155,7 @@ type Pod struct { unretriableGroup *bool list corev1.PodList absentPods int - excessPodExpectations *expectationsStore + excessPodExpectations *expectations.Store satisfiedExcessPods bool } diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go index 4a6f794521..b7c855b8bd 100644 --- a/pkg/controller/jobs/pod/pod_controller_test.go +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -49,6 +49,11 @@ import ( _ "sigs.k8s.io/kueue/pkg/controller/jobs/raycluster" ) +type keyUIDs struct { + key types.NamespacedName + uids []types.UID +} + func TestPodsReady(t *testing.T) { testCases := map[string]struct { pod *corev1.Pod diff --git a/pkg/controller/jobs/pod/expectations.go b/pkg/util/expectations/store.go similarity index 75% rename from pkg/controller/jobs/pod/expectations.go rename to pkg/util/expectations/store.go index 8f07e49813..c9b3070d67 100644 --- a/pkg/controller/jobs/pod/expectations.go +++ b/pkg/util/expectations/store.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package pod +package expectations import ( "sync" @@ -26,22 +26,22 @@ import ( type uids = sets.Set[types.UID] -// expectationsStore contains UIDs for which we are waiting to observe some change through event handlers. -type expectationsStore struct { +// Store contains UIDs for which we are waiting to observe some change through event handlers. +type Store struct { sync.Mutex name string store map[types.NamespacedName]uids } -func newUIDExpectations(name string) *expectationsStore { - return &expectationsStore{ +func NewStore(name string) *Store { + return &Store{ name: name, store: make(map[types.NamespacedName]uids), } } -func (e *expectationsStore) ExpectUIDs(log logr.Logger, key types.NamespacedName, uids []types.UID) { +func (e *Store) ExpectUIDs(log logr.Logger, key types.NamespacedName, uids []types.UID) { log.V(3).Info("Expecting UIDs", "store", e.name, "key", key, "uids", uids) expectedUIDs := sets.New[types.UID](uids...) e.Lock() @@ -55,7 +55,7 @@ func (e *expectationsStore) ExpectUIDs(log logr.Logger, key types.NamespacedName } } -func (e *expectationsStore) ObservedUID(log logr.Logger, key types.NamespacedName, uid types.UID) { +func (e *Store) ObservedUID(log logr.Logger, key types.NamespacedName, uid types.UID) { log.V(3).Info("Observed UID", "store", e.name, "key", key, "uid", uid) e.Lock() defer e.Unlock() @@ -72,7 +72,7 @@ func (e *expectationsStore) ObservedUID(log logr.Logger, key types.NamespacedNam } } -func (e *expectationsStore) Satisfied(log logr.Logger, key types.NamespacedName) bool { +func (e *Store) Satisfied(log logr.Logger, key types.NamespacedName) bool { e.Lock() _, found := e.store[key] e.Unlock() diff --git a/pkg/controller/jobs/pod/expectations_test.go b/pkg/util/expectations/store_test.go similarity index 97% rename from pkg/controller/jobs/pod/expectations_test.go rename to pkg/util/expectations/store_test.go index d8765e23b4..d23ea51b91 100644 --- a/pkg/controller/jobs/pod/expectations_test.go +++ b/pkg/util/expectations/store_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package pod +package expectations import ( "testing" @@ -47,7 +47,7 @@ func TestExpectations(t *testing.T) { uids: []types.UID{"a", "b", "c", "x", "y", "z"}, }, } - expectations := newUIDExpectations("test") + expectations := NewStore("test") err := parallelize.Until(ctx, len(initial), func(i int) error { e := initial[i] expectations.ExpectUIDs(log, e.key, e.uids)