diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 15acb6127e..5ecace6d89 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -1799,12 +1799,16 @@ func TestEnableDegradedMode(t *testing.T) { _, s := newTestTransactionSyncer(fakeCloud, negtypes.VmIpPortEndpointType, false) s.NegSyncerKey.NegName = tc.negName s.needInit = false - addPodsToLister(s.podLister) + addPodsToLister(s.podLister, getDefaultEndpointSlices()) for i := 1; i <= 4; i++ { s.nodeLister.Add(&corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("instance%v", i), }, + Spec: corev1.NodeSpec{ + PodCIDR: fmt.Sprintf("10.100.%v.0/24", i), + PodCIDRs: []string{fmt.Sprintf("200%v:db8::/48", i), fmt.Sprintf("10.100.%v.0/24", i)}, + }, }) } for _, eps := range tc.testEndpointSlices { diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index 7a84c7c04c..de3042bf99 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -486,6 +486,11 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett } podIPs := ipsForPod[types.NamespacedName{Namespace: endpointAddress.TargetRef.Namespace, Name: endpointAddress.TargetRef.Name}] + // endpoint address should match to the IP of its pod + if err = podContainsEndpointAddress(podIPs.IP, pod); err != nil { + klog.V(2).Infof("Endpoint %q does not have an address %s that matches to the IP(s) of its pod %v: %w, skipping", endpointAddress.Addresses, podIPs.IP, pod.Status.PodIPs, err) + continue + } networkEndpoint := negtypes.NetworkEndpoint{IP: podIPs.IP, Port: matchPort, Node: nodeName} if enableDualStackNEG { // Convert all addresses to a standard form as per rfc5952 to prevent @@ -519,6 +524,7 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett // it returns error if the pod: // 1. is in terminal state // 2. corresponds to a non-existent node +// 3. have an IP that matches to a podIP, but is outside of the node's allocated IP range func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) error { // Terminal Pod means a pod is in PodFailed or PodSucceeded phase phase := pod.Status.Phase @@ -529,10 +535,14 @@ func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) error { if err != nil || !exists { return negtypes.ErrEPNodeNotFound } - _, isNode := obj.(*apiv1.Node) + node, isNode := obj.(*apiv1.Node) if !isNode { return negtypes.ErrEPNodeTypeAssertionFailed } + if err = nodeContainsPodIP(node, pod); err != nil { + klog.V(2).Info("Pod %s/%s has an IP %v that is outside of the node's allocated IP range(s) %v, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Status.PodIP, node.Spec.PodCIDRs) + return err + } return nil } @@ -562,6 +572,41 @@ func ipsForPod(eds []negtypes.EndpointsData) map[types.NamespacedName]negtypes.N return result } +// podContainsEndpointAddress checks the pod's existing PodIP(s) +// and return error if the given endpoint address does not any of them. +func podContainsEndpointAddress(endpointAddr string, pod *apiv1.Pod) error { + // a pod can have at most two PodIPs, one for ipv4 and one for ipv6 + for _, podIP := range pod.Status.PodIPs { + if endpointAddr == podIP.IP { + return nil + } + } + return negtypes.ErrEPIPNotFromPod +} + +// nodeContainsPodIP checks the node's existing PodCIDR(s) +// and return error if the given podIP is not within one of the ranges +func nodeContainsPodIP(node *apiv1.Node, pod *apiv1.Pod) error { + ipnets := []*net.IPNet{} + // a node can have at most two PodCIDRs, one for ipv4 and one for ipv6 + for _, podCIDR := range node.Spec.PodCIDRs { + podCIDR = strings.TrimSpace(podCIDR) + _, ipnet, err := net.ParseCIDR(podCIDR) + if err != nil { + // swallow errors for CIDRs that are invalid + continue + } + ipnets = append(ipnets, ipnet) + } + podIP := net.ParseIP(pod.Status.PodIP) + for _, net := range ipnets { + if net.Contains(podIP) { + return nil + } + } + return negtypes.ErrEPIPOutOfPodCIDR +} + // retrieveExistingZoneNetworkEndpointMap lists existing network endpoints in the neg and return the zone and endpoints map func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes.ZoneGetter, cloud negtypes.NetworkEndpointGroupCloud, version meta.Version, mode negtypes.EndpointsCalculatorMode) (map[string]negtypes.NetworkEndpointSet, labels.EndpointPodLabelMap, error) { // Include zones that have non-candidate nodes currently. It is possible that NEGs were created in those zones previously and the endpoints now became non-candidates. diff --git a/pkg/neg/syncers/utils_test.go b/pkg/neg/syncers/utils_test.go index be81853e63..ffc4179a9f 100644 --- a/pkg/neg/syncers/utils_test.go +++ b/pkg/neg/syncers/utils_test.go @@ -22,12 +22,12 @@ import ( "net" "reflect" "strconv" + "strings" "testing" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "github.com/google/go-cmp/cmp" - corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -694,7 +694,8 @@ func TestToZoneNetworkEndpointMap(t *testing.T) { t.Parallel() zoneGetter := negtypes.NewFakeZoneGetter() podLister := negtypes.NewTestContext().PodInformer.GetIndexer() - addPodsToLister(podLister) + testEndpointSlice := getDefaultEndpointSlices() + addPodsToLister(podLister, testEndpointSlice) testCases := []struct { desc string portName string @@ -1738,14 +1739,18 @@ func TestToZoneNetworkEndpointMapDegradedMode(t *testing.T) { fakeZoneGetter := negtypes.NewFakeZoneGetter() testContext := negtypes.NewTestContext() podLister := testContext.PodInformer.GetIndexer() - addPodsToLister(podLister) + addPodsToLister(podLister, getDefaultEndpointSlices()) nodeLister := testContext.NodeInformer.GetIndexer() for i := 1; i <= 4; i++ { - nodeLister.Add(&corev1.Node{ + nodeLister.Add(&v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("instance%v", i), }, + Spec: v1.NodeSpec{ + PodCIDR: fmt.Sprintf("10.100.%v.0/24", i), + PodCIDRs: []string{fmt.Sprintf("200%v:db8::/48", i), fmt.Sprintf("10.100.%v.0/24", i)}, + }, }) } @@ -1861,16 +1866,21 @@ func TestDegradedModeValidateEndpointInfo(t *testing.T) { port80 := int32(80) protocolTCP := v1.ProtocolTCP instance1 := negtypes.TestInstance1 + instance2 := negtypes.TestInstance2 fakeZoneGetter := negtypes.NewFakeZoneGetter() testContext := negtypes.NewTestContext() podLister := testContext.PodInformer.GetIndexer() - addPodsToLister(podLister) + addPodsToLister(podLister, getDefaultEndpointSlices()) nodeLister := testContext.NodeInformer.GetIndexer() - nodeLister.Add(&corev1.Node{ + nodeLister.Add(&v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: instance1, }, + Spec: v1.NodeSpec{ + PodCIDR: "10.100.1.0/24", + PodCIDRs: []string{"2001:db8::/48", "10.100.1.0/24"}, + }, }) endpointMap := map[string]negtypes.NetworkEndpointSet{ @@ -1977,6 +1987,57 @@ func TestDegradedModeValidateEndpointInfo(t *testing.T) { expectedEndpointMap: endpointMap, expectedPodMap: podMap, }, + { + desc: "endpoint with IP not matching to its pod, endpoint should be removed", + testEndpointSlices: []*discovery.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: testServiceName + "-1", + Namespace: testServiceNamespace, + Labels: map[string]string{ + discovery.LabelServiceName: testServiceName, + }, + }, + AddressType: "IPv4", + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.100.1.1"}, + NodeName: &instance1, + TargetRef: &v1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod1", + }, + }, + { + Addresses: []string{"10.100.1.2"}, + NodeName: &instance1, + TargetRef: &v1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod2", + }, + }, + { + Addresses: []string{"10.100.2.2"}, // the IP of this pod is 10.100.2.1 + NodeName: &instance2, + TargetRef: &v1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod3", + }, + }, + }, + Ports: []discovery.EndpointPort{ + { + Name: &emptyNamedPort, + Port: &port80, + Protocol: &protocolTCP, + }, + }, + }, + }, + endpointType: negtypes.VmIpPortEndpointType, + expectedEndpointMap: endpointMap, + expectedPodMap: podMap, + }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { @@ -1998,18 +2059,24 @@ func TestValidatePod(t *testing.T) { testNodeNonExistent := "node-non-existent" testContext := negtypes.NewTestContext() nodeLister := testContext.NodeInformer.GetIndexer() - nodeLister.Add(&corev1.Node{ + nodeLister.Add(&v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: instance1, }, + Spec: v1.NodeSpec{ + PodCIDR: "10.100.1.0/24", + PodCIDRs: []string{"2001:db8::/48", "10.100.1.0/24"}, + }, }) + testPodIPv4 := "10.100.1.1" + testPodIPv6 := "2001:db8::2:1" testCases := []struct { desc string pod *v1.Pod expectErr error }{ { - desc: "a valid pod with phase running", + desc: "a valid pod with IPv4 address and phase running", pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: testServiceNamespace, @@ -2017,24 +2084,43 @@ func TestValidatePod(t *testing.T) { }, Status: v1.PodStatus{ Phase: v1.PodRunning, + PodIP: testPodIPv4, }, - Spec: corev1.PodSpec{ + Spec: v1.PodSpec{ NodeName: instance1, }, }, expectErr: nil, }, { - desc: "a terminal pod with phase failed", + desc: "a valid pod with IPv6 address and phase running", pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: testServiceNamespace, Name: "pod2", }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: testPodIPv6, + }, + Spec: v1.PodSpec{ + NodeName: instance1, + }, + }, + expectErr: nil, + }, + { + desc: "a terminal pod with phase failed", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testServiceNamespace, + Name: "pod3", + }, Status: v1.PodStatus{ Phase: v1.PodFailed, + PodIP: testPodIPv4, }, - Spec: corev1.PodSpec{ + Spec: v1.PodSpec{ NodeName: instance1, }, }, @@ -2045,12 +2131,13 @@ func TestValidatePod(t *testing.T) { pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: testServiceNamespace, - Name: "pod3", + Name: "pod4", }, Status: v1.PodStatus{ Phase: v1.PodSucceeded, + PodIP: testPodIPv4, }, - Spec: corev1.PodSpec{ + Spec: v1.PodSpec{ NodeName: instance1, }, }, @@ -2058,20 +2145,55 @@ func TestValidatePod(t *testing.T) { }, { desc: "a pod from non-existent node", - pod: &corev1.Pod{ + pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: testServiceNamespace, - Name: "pod4", + Name: "pod5", }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: testPodIPv4, }, - Spec: corev1.PodSpec{ + Spec: v1.PodSpec{ NodeName: testNodeNonExistent, }, }, expectErr: negtypes.ErrEPNodeNotFound, }, + { + desc: "a pod with IPv4 IP adress outside of the node's allocated pod range", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testServiceNamespace, + Name: "pod6", + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "10.101.1.1", + }, + Spec: v1.PodSpec{ + NodeName: instance1, + }, + }, + expectErr: negtypes.ErrEPIPOutOfPodCIDR, + }, + { + desc: "a pod with IPv6 IP address outside of the node's allocated pod range", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testServiceNamespace, + Name: "pod7", + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "2001:db9::2:1", + }, + Spec: v1.PodSpec{ + NodeName: instance1, + }, + }, + expectErr: negtypes.ErrEPIPOutOfPodCIDR, + }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { @@ -2122,85 +2244,38 @@ func TestParseIPAddress(t *testing.T) { } } -func addPodsToLister(podLister cache.Indexer) { - // add all pods in default endpoint into podLister - for i := 1; i <= 6; i++ { - podLister.Add(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: testServiceNamespace, - Name: fmt.Sprintf("pod%v", i), - }, - Status: corev1.PodStatus{ - Phase: v1.PodRunning, - }, - Spec: corev1.PodSpec{ - NodeName: testInstance1, - }, - }) +func addPodsToLister(podLister cache.Indexer, endpointSlices []*discovery.EndpointSlice) { + // collect both ipv4 and ipv6 IP address for pods + podToIPs := make(map[string][]v1.PodIP) + podToNodeName := make(map[string]string) + for _, eps := range endpointSlices { + for _, ep := range eps.Endpoints { + pod := fmt.Sprintf("%s/%s", ep.TargetRef.Namespace, ep.TargetRef.Name) + podToNodeName[pod] = *ep.NodeName + for _, addr := range ep.Addresses { + podToIPs[pod] = append(podToIPs[pod], v1.PodIP{IP: addr}) + } + } } - for i := 7; i <= 12; i++ { - podLister.Add(&corev1.Pod{ + for pod, IPs := range podToIPs { + strs := strings.Split(pod, "/") + podNamespace := strs[0] + podName := strs[1] + podLister.Add(&v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: testServiceNamespace, - Name: fmt.Sprintf("pod%v", i), + Namespace: podNamespace, + Name: podName, }, - Status: corev1.PodStatus{ - Phase: v1.PodRunning, + Spec: v1.PodSpec{ + NodeName: podToNodeName[pod], }, - Spec: corev1.PodSpec{ - NodeName: testInstance4, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: IPs[0].IP, + PodIPs: IPs, }, }) } - - podLister.Update(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: testServiceNamespace, - Name: "pod3", - }, - Status: corev1.PodStatus{ - Phase: v1.PodRunning, - }, - Spec: corev1.PodSpec{ - NodeName: testInstance2, - }, - }) - podLister.Update(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: testServiceNamespace, - Name: "pod4", - }, - Status: corev1.PodStatus{ - Phase: v1.PodRunning, - }, - Spec: corev1.PodSpec{ - NodeName: testInstance3, - }, - }) - podLister.Update(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: testServiceNamespace, - Name: "pod7", - }, - Status: corev1.PodStatus{ - Phase: v1.PodRunning, - }, - Spec: corev1.PodSpec{ - NodeName: testInstance2, - }, - }) - podLister.Update(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: testServiceNamespace, - Name: "pod10", - }, - Status: corev1.PodStatus{ - Phase: v1.PodRunning, - }, - Spec: corev1.PodSpec{ - NodeName: testInstance3, - }, - }) } // numToIP converts the given number to an IP address. diff --git a/pkg/neg/types/sync_errors.go b/pkg/neg/types/sync_errors.go index c7743d358e..001e351efe 100644 --- a/pkg/neg/types/sync_errors.go +++ b/pkg/neg/types/sync_errors.go @@ -32,6 +32,8 @@ const ( ReasonInvalidAPIResponse = Reason("InvalidAPIResponse") ReasonInvalidEPAttach = Reason("InvalidEPAttach") ReasonInvalidEPDetach = Reason("InvalidEPDetach") + ReasonEPIPNotFromPod = Reason("EPIPNotFromPod") + ReasonEPIPOutOfPodCIDR = Reason("EPIPOutOfPodCIDR") // these are for non error-state error ReasonNegNotFound = Reason("NegNotFound") @@ -112,6 +114,17 @@ var ( Reason: ReasonInvalidEPDetach, IsErrorState: true, } + ErrEPIPNotFromPod = NegSyncError{ + Err: errors.New("endpoint has an IP that does not correspond to its pod"), + Reason: ReasonEPIPNotFromPod, + IsErrorState: true, + } + ErrEPIPOutOfPodCIDR = NegSyncError{ + Err: errors.New("endpoint has an IP out of PodCIDR range"), + Reason: ReasonEPIPOutOfPodCIDR, + IsErrorState: true, + } + ErrNegNotFound = NegSyncError{ Err: errors.New("failed to get NEG for service"), Reason: ReasonNegNotFound,