diff --git a/pkg/util/podstore/podstore.go b/pkg/util/podstore/podstore.go index 7b32a0d5618..469669b1f3d 100644 --- a/pkg/util/podstore/podstore.go +++ b/pkg/util/podstore/podstore.go @@ -98,15 +98,37 @@ func NewPodStore(podInformer cache.SharedIndexInformer) *PodStore { } func (s *PodStore) onPodUpdate(oldObj interface{}, newObj interface{}) { + oldPod, ok := oldObj.(*corev1.Pod) + if !ok { + klog.ErrorS(nil, "Received unexpected object", "oldObj", oldObj) + return + } newPod, ok := newObj.(*corev1.Pod) if !ok { klog.ErrorS(nil, "Received unexpected object", "newObj", newObj) return } - err := s.pods.Update(newPod) - if err != nil { - klog.ErrorS(err, "Error when updating Pod in index") - return + + // From https://pkg.go.dev/k8s.io/client-go/tools/cache#SharedInformer: + // Because `ObjectMeta.UID` has no role in identifying objects, it is possible that when (1) + // object O1 with ID (e.g. namespace and name) X and `ObjectMeta.UID` U1 in the + // SharedInformer's local cache is deleted and later (2) another object O2 with ID X and + // ObjectMeta.UID U2 is created the informer's clients are not notified of (1) and (2) but + // rather are notified only of an update from O1 to O2. Clients that need to detect such + // cases might do so by comparing the `ObjectMeta.UID` field of the old and the new object + // in the code that handles update notifications (i.e. `OnUpdate` method of + // ResourceEventHandler). + if oldPod.UID != newPod.UID { + if err := s.deletePod(oldPod); err != nil { + klog.ErrorS(err, "Error when deleting Pod from store", "Pod", klog.KObj(oldPod), "UID", oldPod.UID) + } + if err := s.addPod(newPod); err != nil { + klog.ErrorS(err, "Error when adding Pod to store", "Pod", klog.KObj(newPod), "UID", newPod.UID) + } + } else { + if err := s.updatePod(newPod); err != nil { + klog.ErrorS(err, "Error when updating Pod in store", "Pod", klog.KObj(newPod), "UID", newPod.UID) + } } klog.V(4).InfoS("Processed Pod Update Event", "Pod", klog.KObj(newPod)) } @@ -117,19 +139,8 @@ func (s *PodStore) onPodCreate(obj interface{}) { klog.ErrorS(nil, "Received unexpected object", "obj", obj) return } - s.mutex.Lock() - defer s.mutex.Unlock() - timeNow := s.clock.Now() - err := s.pods.Add(pod) - if err != nil { - klog.ErrorS(err, "Error when adding Pod to index") - return - } - switch pod.Status.Phase { - case corev1.PodPending: - s.timestampMap[pod.UID] = &podTimestamps{CreationTimestamp: timeNow} - default: - s.timestampMap[pod.UID] = &podTimestamps{CreationTimestamp: pod.CreationTimestamp.Time} + if err := s.addPod(pod); err != nil { + klog.ErrorS(err, "Error when adding Pod to store", "Pod", klog.KObj(pod), "UID", pod.UID) } klog.V(4).InfoS("Processed Pod Create Event", "Pod", klog.KObj(pod)) } @@ -144,17 +155,47 @@ func (s *PodStore) onPodDelete(obj interface{}) { return } } + if err := s.deletePod(pod); err != nil { + klog.ErrorS(err, "Error when deleting Pod from store", "Pod", klog.KObj(pod), "UID", pod.UID) + } + klog.V(4).InfoS("Processed Pod Delete Event", "Pod", klog.KObj(pod)) +} + +func (s *PodStore) addPod(pod *corev1.Pod) error { + timeNow := s.clock.Now() s.mutex.Lock() defer s.mutex.Unlock() + err := s.pods.Add(pod) + if err != nil { + return fmt.Errorf("error when adding Pod to index: %w", err) + } + switch pod.Status.Phase { + case corev1.PodPending: + s.timestampMap[pod.UID] = &podTimestamps{CreationTimestamp: timeNow} + default: + s.timestampMap[pod.UID] = &podTimestamps{CreationTimestamp: pod.CreationTimestamp.Time} + } + return nil +} + +func (s *PodStore) updatePod(pod *corev1.Pod) error { + if err := s.pods.Update(pod); err != nil { + return fmt.Errorf("error when updating Pod in index: %w", err) + } + return nil +} + +func (s *PodStore) deletePod(pod *corev1.Pod) error { timeNow := s.clock.Now() + s.mutex.Lock() + defer s.mutex.Unlock() timestamp, ok := s.timestampMap[pod.UID] if !ok { - klog.ErrorS(nil, "Cannot find podTimestamps in timestampMap", "UID", pod.UID) - return + return fmt.Errorf("cannot find podTimestamps in timestampMap") } timestamp.DeletionTimestamp = &timeNow s.podsToDelete.AddAfter(pod.UID, s.delayTime) - klog.V(4).InfoS("Processed Pod Delete Event", "Pod", klog.KObj(pod)) + return nil } func (s *PodStore) checkDeletedPod(obj interface{}) (*corev1.Pod, error) { diff --git a/pkg/util/podstore/podstore_test.go b/pkg/util/podstore/podstore_test.go index 44da0b64946..8e3c17f1223 100644 --- a/pkg/util/podstore/podstore_test.go +++ b/pkg/util/podstore/podstore_test.go @@ -132,46 +132,70 @@ var ( ) func Test_onPodUpdate(t *testing.T) { - newPod1 := &v1.Pod{ - Status: v1.PodStatus{ - PodIPs: []v1.PodIP{ - { - IP: "4.5.6.7", - }, - }, - }, + fakeClock := clock.NewFakeClock(time.Now()) + oldPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pod1", Namespace: "pod1_ns", UID: "pod1", }, } + newPod1 := oldPod.DeepCopy() + newPod1.Status.PodIPs = []v1.PodIP{ + { + IP: "4.5.6.7", + }, + } + newPod2 := oldPod.DeepCopy() + newPod2.UID = "pod1_new" tests := []struct { - name string - oldObj interface{} - newObj interface{} - expectedPod *v1.Pod + name string + newObj interface{} + expectedPods []*v1.Pod + oldPodDeleted bool }{ { - name: "newObj is not Pod", - newObj: node, - expectedPod: pod1, + name: "newObj is not Pod", + newObj: node, + expectedPods: []*v1.Pod{oldPod}, + }, + { + name: "Pod IP update", + newObj: newPod1, + expectedPods: []*v1.Pod{newPod1}, }, { - name: "valid case", - newObj: newPod1, - expectedPod: newPod1, + name: "same name, new UID", + newObj: newPod2, + expectedPods: []*v1.Pod{oldPod, newPod2}, + oldPodDeleted: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { podStore := &PodStore{ - pods: cache.NewIndexer(podKeyFunc, cache.Indexers{podIPIndex: podIPIndexFunc}), + timestampMap: map[types.UID]*podTimestamps{}, + clock: fakeClock, + pods: cache.NewIndexer(podKeyFunc, cache.Indexers{podIPIndex: podIPIndexFunc}), + podsToDelete: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[types.UID]{ + Name: deleteQueueName, + Clock: fakeClock, + }), + } + require.NoError(t, podStore.addPod(oldPod)) + podStore.onPodUpdate(oldPod, tt.newObj) + pods := make([]*v1.Pod, 0) + for _, obj := range podStore.pods.List() { + pods = append(pods, obj.(*v1.Pod)) + } + assert.ElementsMatch(t, tt.expectedPods, pods) + if tt.oldPodDeleted { + require.Equal(t, 1, podStore.podsToDelete.Len()) + uid, _ := podStore.podsToDelete.Get() + assert.Equal(t, oldPod.UID, uid) + } else { + assert.Equal(t, 0, podStore.podsToDelete.Len()) } - require.NoError(t, podStore.pods.Add(pod1)) - podStore.onPodUpdate(tt.oldObj, tt.newObj) - require.Len(t, podStore.pods.List(), 1) - assert.Equal(t, tt.expectedPod, podStore.pods.List()[0].(*v1.Pod)) }) } }