Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Pod UID updates in PodStore #6964

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 61 additions & 20 deletions pkg/util/podstore/podstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,37 @@ func NewPodStore(podInformer cache.SharedIndexInformer) *PodStore {
}

func (s *PodStore) onPodUpdate(oldObj interface{}, newObj interface{}) {
oldPod, ok := oldObj.(*corev1.Pod)
if !ok {
klog.ErrorS(nil, "Received unexpected object", "oldObj", oldObj)
return
}
newPod, ok := newObj.(*corev1.Pod)
if !ok {
klog.ErrorS(nil, "Received unexpected object", "newObj", newObj)
return
}
err := s.pods.Update(newPod)
if err != nil {
klog.ErrorS(err, "Error when updating Pod in index")
return

// From https://pkg.go.dev/k8s.io/client-go/tools/cache#SharedInformer:
// Because `ObjectMeta.UID` has no role in identifying objects, it is possible that when (1)
// object O1 with ID (e.g. namespace and name) X and `ObjectMeta.UID` U1 in the
// SharedInformer's local cache is deleted and later (2) another object O2 with ID X and
// ObjectMeta.UID U2 is created the informer's clients are not notified of (1) and (2) but
// rather are notified only of an update from O1 to O2. Clients that need to detect such
// cases might do so by comparing the `ObjectMeta.UID` field of the old and the new object
// in the code that handles update notifications (i.e. `OnUpdate` method of
// ResourceEventHandler).
Comment on lines +112 to +120
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tnqn I have not been able to reproduce, but looking at the issue that triggered this documentation update, it seems that it would only happen if a watch is interrupted.

There was a similar change in kube-scheduler at some point: kubernetes/kubernetes#91126. The logic was later changed, but I believe the current code still accounts for the fact that a Delete and an Add event can be "merged" as an Update under certain rare conditions.

Now I am wondering: is it ever safe to assume that the update event handler will never observe a change to an immutable field? If oldObj and newObj can have different UIDs, it feels like other fields which are supposed to be immutable can also be changed through an "update" (of course these are 2 distinct objects, but with the same name, so from the point of view of the event handler they are the "same"). Am I over thinking this?

Copy link
Member

@tnqn tnqn Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are not overthinking this. We should estabilish a principle that do not assume objects received from update handler refer to the same objects. I think we didn't encounter issues was because:

  1. It's a very corner case;
  2. In most cases we don't care about UID and these immutable fields. I can find one place that may have the problem:
    // For Pod, we only care about PodIP and NodeName update.
    // Some other attributes we care about are immutable, e.g. the named ContainerPort.
    but it's unlikely to happen because two Pods having the same name will unlikely have same PodIP but different containerPorts (although we should still fix it.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I identified another possible controller with this issue earlier: #6965

if oldPod.UID != newPod.UID {
if err := s.deletePod(oldPod); err != nil {
klog.ErrorS(err, "Error when deleting Pod from store", "Pod", klog.KObj(oldPod), "UID", oldPod.UID)
}
if err := s.addPod(newPod); err != nil {
klog.ErrorS(err, "Error when adding Pod to store", "Pod", klog.KObj(newPod), "UID", newPod.UID)
}
} else {
if err := s.updatePod(newPod); err != nil {
klog.ErrorS(err, "Error when updating Pod in store", "Pod", klog.KObj(newPod), "UID", newPod.UID)
}
}
klog.V(4).InfoS("Processed Pod Update Event", "Pod", klog.KObj(newPod))
}
Expand All @@ -117,19 +139,8 @@ func (s *PodStore) onPodCreate(obj interface{}) {
klog.ErrorS(nil, "Received unexpected object", "obj", obj)
return
}
s.mutex.Lock()
defer s.mutex.Unlock()
timeNow := s.clock.Now()
err := s.pods.Add(pod)
if err != nil {
klog.ErrorS(err, "Error when adding Pod to index")
return
}
switch pod.Status.Phase {
case corev1.PodPending:
s.timestampMap[pod.UID] = &podTimestamps{CreationTimestamp: timeNow}
default:
s.timestampMap[pod.UID] = &podTimestamps{CreationTimestamp: pod.CreationTimestamp.Time}
if err := s.addPod(pod); err != nil {
klog.ErrorS(err, "Error when adding Pod to store", "Pod", klog.KObj(pod), "UID", pod.UID)
}
klog.V(4).InfoS("Processed Pod Create Event", "Pod", klog.KObj(pod))
}
Expand All @@ -144,17 +155,47 @@ func (s *PodStore) onPodDelete(obj interface{}) {
return
}
}
if err := s.deletePod(pod); err != nil {
klog.ErrorS(err, "Error when deleting Pod from store", "Pod", klog.KObj(pod), "UID", pod.UID)
}
klog.V(4).InfoS("Processed Pod Delete Event", "Pod", klog.KObj(pod))
}

func (s *PodStore) addPod(pod *corev1.Pod) error {
timeNow := s.clock.Now()
s.mutex.Lock()
defer s.mutex.Unlock()
err := s.pods.Add(pod)
if err != nil {
return fmt.Errorf("error when adding Pod to index: %w", err)
}
switch pod.Status.Phase {
case corev1.PodPending:
s.timestampMap[pod.UID] = &podTimestamps{CreationTimestamp: timeNow}
default:
s.timestampMap[pod.UID] = &podTimestamps{CreationTimestamp: pod.CreationTimestamp.Time}
}
return nil
}

func (s *PodStore) updatePod(pod *corev1.Pod) error {
if err := s.pods.Update(pod); err != nil {
return fmt.Errorf("error when updating Pod in index: %w", err)
}
return nil
}

func (s *PodStore) deletePod(pod *corev1.Pod) error {
timeNow := s.clock.Now()
s.mutex.Lock()
defer s.mutex.Unlock()
timestamp, ok := s.timestampMap[pod.UID]
if !ok {
klog.ErrorS(nil, "Cannot find podTimestamps in timestampMap", "UID", pod.UID)
return
return fmt.Errorf("cannot find podTimestamps in timestampMap")
}
timestamp.DeletionTimestamp = &timeNow
s.podsToDelete.AddAfter(pod.UID, s.delayTime)
klog.V(4).InfoS("Processed Pod Delete Event", "Pod", klog.KObj(pod))
return nil
}

func (s *PodStore) checkDeletedPod(obj interface{}) (*corev1.Pod, error) {
Expand Down
70 changes: 47 additions & 23 deletions pkg/util/podstore/podstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,46 +132,70 @@ var (
)

func Test_onPodUpdate(t *testing.T) {
newPod1 := &v1.Pod{
Status: v1.PodStatus{
PodIPs: []v1.PodIP{
{
IP: "4.5.6.7",
},
},
},
fakeClock := clock.NewFakeClock(time.Now())
oldPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "pod1_ns",
UID: "pod1",
},
}
newPod1 := oldPod.DeepCopy()
newPod1.Status.PodIPs = []v1.PodIP{
{
IP: "4.5.6.7",
},
}
newPod2 := oldPod.DeepCopy()
newPod2.UID = "pod1_new"
tests := []struct {
name string
oldObj interface{}
newObj interface{}
expectedPod *v1.Pod
name string
newObj interface{}
expectedPods []*v1.Pod
oldPodDeleted bool
}{
{
name: "newObj is not Pod",
newObj: node,
expectedPod: pod1,
name: "newObj is not Pod",
newObj: node,
expectedPods: []*v1.Pod{oldPod},
},
{
name: "Pod IP update",
newObj: newPod1,
expectedPods: []*v1.Pod{newPod1},
},
{
name: "valid case",
newObj: newPod1,
expectedPod: newPod1,
name: "same name, new UID",
newObj: newPod2,
expectedPods: []*v1.Pod{oldPod, newPod2},
oldPodDeleted: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
podStore := &PodStore{
pods: cache.NewIndexer(podKeyFunc, cache.Indexers{podIPIndex: podIPIndexFunc}),
timestampMap: map[types.UID]*podTimestamps{},
clock: fakeClock,
pods: cache.NewIndexer(podKeyFunc, cache.Indexers{podIPIndex: podIPIndexFunc}),
podsToDelete: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[types.UID]{
Name: deleteQueueName,
Clock: fakeClock,
}),
}
require.NoError(t, podStore.addPod(oldPod))
podStore.onPodUpdate(oldPod, tt.newObj)
pods := make([]*v1.Pod, 0)
for _, obj := range podStore.pods.List() {
pods = append(pods, obj.(*v1.Pod))
}
assert.ElementsMatch(t, tt.expectedPods, pods)
if tt.oldPodDeleted {
require.Equal(t, 1, podStore.podsToDelete.Len())
uid, _ := podStore.podsToDelete.Get()
assert.Equal(t, oldPod.UID, uid)
} else {
assert.Equal(t, 0, podStore.podsToDelete.Len())
}
require.NoError(t, podStore.pods.Add(pod1))
podStore.onPodUpdate(tt.oldObj, tt.newObj)
require.Len(t, podStore.pods.List(), 1)
assert.Equal(t, tt.expectedPod, podStore.pods.List()[0].(*v1.Pod))
})
}
}
Expand Down
Loading