Skip to content

Commit

Permalink
TAS: move expecatations out for reuse (kubernetes-sigs#3270)
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo authored and kannon92 committed Nov 19, 2024
1 parent 5b89db3 commit 32f9fe3
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 14 deletions.
3 changes: 2 additions & 1 deletion pkg/controller/jobs/pod/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/util/expectations"
)

var (
Expand Down Expand Up @@ -62,7 +63,7 @@ func reconcileRequestForPod(p *corev1.Pod) reconcile.Request {
// podEventHandler will convert reconcile requests for pods in group from "<namespace>/<pod-name>" to
// "group/<namespace>/<group-name>".
type podEventHandler struct {
cleanedUpPodsExpectations *expectationsStore
cleanedUpPodsExpectations *expectations.Store
}

func (h *podEventHandler) Create(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
"sigs.k8s.io/kueue/pkg/util/expectations"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
"sigs.k8s.io/kueue/pkg/util/maps"
"sigs.k8s.io/kueue/pkg/util/parallelize"
Expand Down Expand Up @@ -115,7 +116,7 @@ func init() {

type Reconciler struct {
*jobframework.JobReconciler
expectationsStore *expectationsStore
expectationsStore *expectations.Store
}

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand All @@ -142,7 +143,7 @@ func NewJob() jobframework.GenericJob {
func NewReconciler(c client.Client, record record.EventRecorder, opts ...jobframework.Option) jobframework.JobReconcilerInterface {
return &Reconciler{
JobReconciler: jobframework.NewReconciler(c, record, opts...),
expectationsStore: newUIDExpectations("finalizedPods"),
expectationsStore: expectations.NewStore("finalizedPods"),
}
}

Expand All @@ -154,7 +155,7 @@ type Pod struct {
unretriableGroup *bool
list corev1.PodList
absentPods int
excessPodExpectations *expectationsStore
excessPodExpectations *expectations.Store
satisfiedExcessPods bool
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ import (
_ "sigs.k8s.io/kueue/pkg/controller/jobs/raycluster"
)

type keyUIDs struct {
key types.NamespacedName
uids []types.UID
}

func TestPodsReady(t *testing.T) {
testCases := map[string]struct {
pod *corev1.Pod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package pod
package expectations

import (
"sync"
Expand All @@ -26,22 +26,22 @@ import (

type uids = sets.Set[types.UID]

// expectationsStore contains UIDs for which we are waiting to observe some change through event handlers.
type expectationsStore struct {
// Store contains UIDs for which we are waiting to observe some change through event handlers.
type Store struct {
sync.Mutex
name string

store map[types.NamespacedName]uids
}

func newUIDExpectations(name string) *expectationsStore {
return &expectationsStore{
func NewStore(name string) *Store {
return &Store{
name: name,
store: make(map[types.NamespacedName]uids),
}
}

func (e *expectationsStore) ExpectUIDs(log logr.Logger, key types.NamespacedName, uids []types.UID) {
func (e *Store) ExpectUIDs(log logr.Logger, key types.NamespacedName, uids []types.UID) {
log.V(3).Info("Expecting UIDs", "store", e.name, "key", key, "uids", uids)
expectedUIDs := sets.New[types.UID](uids...)
e.Lock()
Expand All @@ -55,7 +55,7 @@ func (e *expectationsStore) ExpectUIDs(log logr.Logger, key types.NamespacedName
}
}

func (e *expectationsStore) ObservedUID(log logr.Logger, key types.NamespacedName, uid types.UID) {
func (e *Store) ObservedUID(log logr.Logger, key types.NamespacedName, uid types.UID) {
log.V(3).Info("Observed UID", "store", e.name, "key", key, "uid", uid)
e.Lock()
defer e.Unlock()
Expand All @@ -72,7 +72,7 @@ func (e *expectationsStore) ObservedUID(log logr.Logger, key types.NamespacedNam
}
}

func (e *expectationsStore) Satisfied(log logr.Logger, key types.NamespacedName) bool {
func (e *Store) Satisfied(log logr.Logger, key types.NamespacedName) bool {
e.Lock()
_, found := e.store[key]
e.Unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package pod
package expectations

import (
"testing"
Expand Down Expand Up @@ -47,7 +47,7 @@ func TestExpectations(t *testing.T) {
uids: []types.UID{"a", "b", "c", "x", "y", "z"},
},
}
expectations := newUIDExpectations("test")
expectations := NewStore("test")
err := parallelize.Until(ctx, len(initial), func(i int) error {
e := initial[i]
expectations.ExpectUIDs(log, e.key, e.uids)
Expand Down

0 comments on commit 32f9fe3

Please sign in to comment.