From 6928e3118272f697bcd6108109a214065339ef47 Mon Sep 17 00:00:00 2001 From: Seth Jennings Date: Mon, 21 Aug 2017 15:28:38 -0500 Subject: [PATCH 1/2] UPSTREAM: 50934: Skip non-update endpoint updates --- .../kubernetes/pkg/controller/endpoint/BUILD | 1 + .../endpoint/endpoints_controller.go | 77 +++++++-- .../endpoint/endpoints_controller_test.go | 148 ++++++++++++++++++ 3 files changed, 212 insertions(+), 14 deletions(-) diff --git a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/BUILD b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/BUILD index bd1ea282e49e..df5f0b2db06a 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/BUILD @@ -50,6 +50,7 @@ go_test( "//pkg/controller:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor:k8s.io/apimachinery/pkg/util/intstr", "//vendor:k8s.io/client-go/rest", "//vendor:k8s.io/client-go/tools/cache", diff --git a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go index 54c8b9c33441..5c213b5bed0a 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go @@ -181,6 +181,49 @@ func (e *EndpointController) addPod(obj interface{}) { } } +func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress { + return &v1.EndpointAddress{ + IP: pod.Status.PodIP, + NodeName: &pod.Spec.NodeName, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Namespace: pod.ObjectMeta.Namespace, + Name: pod.ObjectMeta.Name, + UID: pod.ObjectMeta.UID, + ResourceVersion: pod.ObjectMeta.ResourceVersion, + }} +} + +func podAddressChanged(oldPod, newPod *v1.Pod) bool { + // Convert the pod to an EndpointAddress, clear inert fields, + // and see if they are the same. + newEndpointAddress := podToEndpointAddress(newPod) + oldEndpointAddress := podToEndpointAddress(oldPod) + // Ignore the ResourceVersion because it changes + // with every pod update. This allows the comparison to + // show equality if all other relevant fields match. + newEndpointAddress.TargetRef.ResourceVersion = "" + oldEndpointAddress.TargetRef.ResourceVersion = "" + if reflect.DeepEqual(newEndpointAddress, oldEndpointAddress) { + // The pod has not changed in any way that impacts the endpoints + return false + } + return true +} + +func determineNeededServiceUpdates(oldServices, services sets.String, podChanged bool) sets.String { + if podChanged { + // if the labels and pod changed, all services need to be updated + services = services.Union(oldServices) + } else { + // if only the labels changed, services not common to + // both the new and old service set (i.e the disjunctive union) + // need to be updated + services = services.Difference(oldServices).Union(oldServices.Difference(services)) + } + return services +} + // When a pod is updated, figure out what services it used to be a member of // and what services it will be a member of, and enqueue the union of these. // old and cur must be *v1.Pod types. @@ -192,22 +235,37 @@ func (e *EndpointController) updatePod(old, cur interface{}) { // Two different versions of the same pod will always have different RVs. return } + + podChanged := podAddressChanged(oldPod, newPod) + + // Check if the pod labels have changed, indicating a possibe + // change in the service membership + labelsChanged := false + if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) || + !hostNameAndDomainAreEqual(newPod, oldPod) { + labelsChanged = true + } + + // If both the pod and labels are unchanged, no update is needed + if !podChanged && !labelsChanged { + return + } + services, err := e.getPodServiceMemberships(newPod) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)) return } - // Only need to get the old services if the labels changed. - if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) || - !hostNameAndDomainAreEqual(newPod, oldPod) { + if labelsChanged { oldServices, err := e.getPodServiceMemberships(oldPod) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)) return } - services = services.Union(oldServices) + services = determineNeededServiceUpdates(oldServices, services, podChanged) } + for key := range services { e.queue.Add(key) } @@ -379,16 +437,7 @@ func (e *EndpointController) syncService(key string) error { } epp := v1.EndpointPort{Name: portName, Port: int32(portNum), Protocol: portProto} - epa := v1.EndpointAddress{ - IP: pod.Status.PodIP, - NodeName: &pod.Spec.NodeName, - TargetRef: &v1.ObjectReference{ - Kind: "Pod", - Namespace: pod.ObjectMeta.Namespace, - Name: pod.ObjectMeta.Name, - UID: pod.ObjectMeta.UID, - ResourceVersion: pod.ObjectMeta.ResourceVersion, - }} + epa := *podToEndpointAddress(pod) hostname := getHostname(pod) if len(hostname) > 0 && diff --git a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller_test.go b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller_test.go index 570e23ca7d01..e091e32ad4db 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller_test.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller_test.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" utiltesting "k8s.io/client-go/util/testing" @@ -561,3 +562,150 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { }) endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) } + +func TestPodToEndpointAddress(t *testing.T) { + podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) + ns := "test" + addPods(podStore, ns, 1, 1, 0) + pods := podStore.List() + if len(pods) != 1 { + t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods)) + return + } + pod := pods[0].(*v1.Pod) + epa := podToEndpointAddress(pod) + if epa.IP != pod.Status.PodIP { + t.Errorf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP) + } + if *(epa.NodeName) != pod.Spec.NodeName { + t.Errorf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName)) + } + if epa.TargetRef.Kind != "Pod" { + t.Errorf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind) + } + if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace { + t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace) + } + if epa.TargetRef.Name != pod.ObjectMeta.Name { + t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name) + } + if epa.TargetRef.UID != pod.ObjectMeta.UID { + t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID) + } + if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion { + t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion) + } +} + +func TestPodAddressChanged(t *testing.T) { + podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) + ns := "test" + addPods(podStore, ns, 1, 1, 0) + pods := podStore.List() + if len(pods) != 1 { + t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods)) + return + } + oldPod := pods[0].(*v1.Pod) + objCopy, err := api.Scheme.DeepCopy(oldPod) + if err != nil { + t.Errorf("error copying Pod: %v ", err) + } + copied, ok := objCopy.(*v1.Pod) + if !ok { + t.Errorf("expected pod, got %#v", objCopy) + } + newPod := copied + + if podAddressChanged(oldPod, newPod) { + t.Errorf("Expected address to be unchanged for copied pod") + } + + newPod.Spec.NodeName = "changed" + if !podAddressChanged(oldPod, newPod) { + t.Errorf("Expected address to be changed for pod with NodeName changed") + } + newPod.Spec.NodeName = oldPod.Spec.NodeName + + newPod.ObjectMeta.ResourceVersion = "changed" + if podAddressChanged(oldPod, newPod) { + t.Errorf("Expected address to be unchanged for pod with only ResourceVersion changed") + } + newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion + + newPod.Status.PodIP = "1.2.3.1" + if !podAddressChanged(oldPod, newPod) { + t.Errorf("Expected address to be changed with pod IP address change") + } + newPod.Status.PodIP = oldPod.Status.PodIP + + newPod.ObjectMeta.Name = "wrong-name" + if !podAddressChanged(oldPod, newPod) { + t.Errorf("Expected address to be changed with pod name change") + } + newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name +} + +func TestDetermineNeededServiceUpdates(t *testing.T) { + testCases := []struct { + name string + a sets.String + b sets.String + union sets.String + xor sets.String + }{ + { + name: "no services changed", + a: sets.NewString("a", "b", "c"), + b: sets.NewString("a", "b", "c"), + xor: sets.NewString(), + union: sets.NewString("a", "b", "c"), + }, + { + name: "all old services removed, new services added", + a: sets.NewString("a", "b", "c"), + b: sets.NewString("d", "e", "f"), + xor: sets.NewString("a", "b", "c", "d", "e", "f"), + union: sets.NewString("a", "b", "c", "d", "e", "f"), + }, + { + name: "all old services removed, no new services added", + a: sets.NewString("a", "b", "c"), + b: sets.NewString(), + xor: sets.NewString("a", "b", "c"), + union: sets.NewString("a", "b", "c"), + }, + { + name: "no old services, but new services added", + a: sets.NewString(), + b: sets.NewString("a", "b", "c"), + xor: sets.NewString("a", "b", "c"), + union: sets.NewString("a", "b", "c"), + }, + { + name: "one service removed, one service added, two unchanged", + a: sets.NewString("a", "b", "c"), + b: sets.NewString("b", "c", "d"), + xor: sets.NewString("a", "d"), + union: sets.NewString("a", "b", "c", "d"), + }, + { + name: "no services", + a: sets.NewString(), + b: sets.NewString(), + xor: sets.NewString(), + union: sets.NewString(), + }, + } + for _, testCase := range testCases { + retval := determineNeededServiceUpdates(testCase.a, testCase.b, false) + if !retval.Equal(testCase.xor) { + t.Errorf("%s (with podChanged=false): expected: %v got: %v", testCase.name, testCase.xor.List(), retval.List()) + } + + retval = determineNeededServiceUpdates(testCase.a, testCase.b, true) + if !retval.Equal(testCase.union) { + t.Errorf("%s (with podChanged=true): expected: %v got: %v", testCase.name, testCase.union.List(), retval.List()) + } + } +} From 846354e97781a5123665456bcb29cbacb34c0f37 Mon Sep 17 00:00:00 2001 From: Joel Smith Date: Tue, 22 Aug 2017 19:22:50 -0600 Subject: [PATCH 2/2] UPSTREAM: 51144: Fix unready endpoints bug introduced in #50934 --- .../endpoint/endpoints_controller.go | 15 +++++++--- .../endpoint/endpoints_controller_test.go | 29 ++++++++++++------- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go index 5c213b5bed0a..64767014d303 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go @@ -194,7 +194,14 @@ func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress { }} } -func podAddressChanged(oldPod, newPod *v1.Pod) bool { +func podChanged(oldPod, newPod *v1.Pod) bool { + // If the pod's readiness has changed, the associated endpoint address + // will move from the unready endpoints set to the ready endpoints. + // So for the purposes of an endpoint, a readiness change on a pod + // means we have a changed pod. + if v1.IsPodReady(oldPod) != v1.IsPodReady(newPod) { + return true + } // Convert the pod to an EndpointAddress, clear inert fields, // and see if they are the same. newEndpointAddress := podToEndpointAddress(newPod) @@ -236,7 +243,7 @@ func (e *EndpointController) updatePod(old, cur interface{}) { return } - podChanged := podAddressChanged(oldPod, newPod) + podChangedFlag := podChanged(oldPod, newPod) // Check if the pod labels have changed, indicating a possibe // change in the service membership @@ -247,7 +254,7 @@ func (e *EndpointController) updatePod(old, cur interface{}) { } // If both the pod and labels are unchanged, no update is needed - if !podChanged && !labelsChanged { + if !podChangedFlag && !labelsChanged { return } @@ -263,7 +270,7 @@ func (e *EndpointController) updatePod(old, cur interface{}) { utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)) return } - services = determineNeededServiceUpdates(oldServices, services, podChanged) + services = determineNeededServiceUpdates(oldServices, services, podChangedFlag) } for key := range services { diff --git a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller_test.go b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller_test.go index e091e32ad4db..811de6f7c43d 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller_test.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller_test.go @@ -597,7 +597,7 @@ func TestPodToEndpointAddress(t *testing.T) { } } -func TestPodAddressChanged(t *testing.T) { +func TestPodChanged(t *testing.T) { podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) ns := "test" addPods(podStore, ns, 1, 1, 0) @@ -617,33 +617,40 @@ func TestPodAddressChanged(t *testing.T) { } newPod := copied - if podAddressChanged(oldPod, newPod) { - t.Errorf("Expected address to be unchanged for copied pod") + if podChanged(oldPod, newPod) { + t.Errorf("Expected pod to be unchanged for copied pod") } newPod.Spec.NodeName = "changed" - if !podAddressChanged(oldPod, newPod) { - t.Errorf("Expected address to be changed for pod with NodeName changed") + if !podChanged(oldPod, newPod) { + t.Errorf("Expected pod to be changed for pod with NodeName changed") } newPod.Spec.NodeName = oldPod.Spec.NodeName newPod.ObjectMeta.ResourceVersion = "changed" - if podAddressChanged(oldPod, newPod) { - t.Errorf("Expected address to be unchanged for pod with only ResourceVersion changed") + if podChanged(oldPod, newPod) { + t.Errorf("Expected pod to be unchanged for pod with only ResourceVersion changed") } newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion newPod.Status.PodIP = "1.2.3.1" - if !podAddressChanged(oldPod, newPod) { - t.Errorf("Expected address to be changed with pod IP address change") + if !podChanged(oldPod, newPod) { + t.Errorf("Expected pod to be changed with pod IP address change") } newPod.Status.PodIP = oldPod.Status.PodIP newPod.ObjectMeta.Name = "wrong-name" - if !podAddressChanged(oldPod, newPod) { - t.Errorf("Expected address to be changed with pod name change") + if !podChanged(oldPod, newPod) { + t.Errorf("Expected pod to be changed with pod name change") } newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name + + saveConditions := oldPod.Status.Conditions + oldPod.Status.Conditions = nil + if !podChanged(oldPod, newPod) { + t.Errorf("Expected pod to be changed with pod readiness change") + } + oldPod.Status.Conditions = saveConditions } func TestDetermineNeededServiceUpdates(t *testing.T) {