From b96225c814cb866d82faa538554159ff093bdb1c Mon Sep 17 00:00:00 2001 From: David Cheung Date: Tue, 4 Apr 2023 17:17:15 +0000 Subject: [PATCH] Filter pods that don't belong to the service in question Filter pods that don't have labels match to its service label selector. --- pkg/neg/manager.go | 3 +- pkg/neg/syncers/endpoints_calculator.go | 13 +- pkg/neg/syncers/endpoints_calculator_test.go | 3 +- pkg/neg/syncers/transaction.go | 3 +- pkg/neg/syncers/transaction_test.go | 17 +- pkg/neg/syncers/utils.go | 34 +++- pkg/neg/syncers/utils_test.go | 166 +++++++++++++++++-- pkg/neg/types/sync_errors.go | 12 ++ 8 files changed, 228 insertions(+), 23 deletions(-) diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index d136cfe443..5fb17b92e2 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -228,8 +228,9 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg // determine the implementation that calculates NEG endpoints on each sync. epc := negsyncer.GetEndpointsCalculator( - manager.nodeLister, manager.podLister, + manager.nodeLister, + manager.serviceLister, manager.zoneGetter, syncerKey, portInfo.EpCalculatorMode, diff --git a/pkg/neg/syncers/endpoints_calculator.go b/pkg/neg/syncers/endpoints_calculator.go index 3f6b3a67ab..d962f4f892 100644 --- a/pkg/neg/syncers/endpoints_calculator.go +++ b/pkg/neg/syncers/endpoints_calculator.go @@ -189,17 +189,26 @@ type L7EndpointsCalculator struct { servicePortName string podLister cache.Indexer nodeLister cache.Indexer + serviceLister cache.Indexer networkEndpointType types.NetworkEndpointType enableDualStackNEG bool logger klog.Logger } -func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister, nodeLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator { +func NewL7EndpointsCalculator( + zoneGetter types.ZoneGetter, + podLister, nodeLister, serviceLister cache.Indexer, + svcPortName string, + endpointType types.NetworkEndpointType, + logger klog.Logger, + enableDualStackNEG bool, +) *L7EndpointsCalculator { return &L7EndpointsCalculator{ zoneGetter: zoneGetter, servicePortName: svcPortName, podLister: podLister, nodeLister: nodeLister, + serviceLister: serviceLister, networkEndpointType: endpointType, enableDualStackNEG: enableDualStackNEG, logger: logger.WithName("L7EndpointsCalculator"), @@ -219,7 +228,7 @@ func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _ // CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs. func (l *L7EndpointsCalculator) CalculateEndpointsDegradedMode(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) { - result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.servicePortName, l.networkEndpointType, l.enableDualStackNEG) + result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.serviceLister, l.servicePortName, l.networkEndpointType, l.enableDualStackNEG) return result.NetworkEndpointSet, result.EndpointPodMap, nil } diff --git a/pkg/neg/syncers/endpoints_calculator_test.go b/pkg/neg/syncers/endpoints_calculator_test.go index 75daab7b4e..ef9a989607 100644 --- a/pkg/neg/syncers/endpoints_calculator_test.go +++ b/pkg/neg/syncers/endpoints_calculator_test.go @@ -233,7 +233,8 @@ func TestValidateEndpoints(t *testing.T) { testContext := negtypes.NewTestContext() podLister := testContext.PodInformer.GetIndexer() nodeLister := testContext.NodeInformer.GetIndexer() - L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG) + serviceLister := testContext.ServiceInformer.GetIndexer() + L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, serviceLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG) L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO()) L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO()) diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index bbb8d513b4..d47fbeda78 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -173,7 +173,7 @@ func NewTransactionSyncer( return syncer } -func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negtypes.ZoneGetter, syncerKey negtypes.NegSyncerKey, mode negtypes.EndpointsCalculatorMode, logger klog.Logger, enableDualStackNEG bool) negtypes.NetworkEndpointsCalculator { +func GetEndpointsCalculator(podLister, nodeLister, serviceLister cache.Indexer, zoneGetter negtypes.ZoneGetter, syncerKey negtypes.NegSyncerKey, mode negtypes.EndpointsCalculatorMode, logger klog.Logger, enableDualStackNEG bool) negtypes.NetworkEndpointsCalculator { serviceKey := strings.Join([]string{syncerKey.Name, syncerKey.Namespace}, "/") if syncerKey.NegType == negtypes.VmIpEndpointType { nodeLister := listers.NewNodeLister(nodeLister) @@ -188,6 +188,7 @@ func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negt zoneGetter, podLister, nodeLister, + serviceLister, syncerKey.PortTuple.Name, syncerKey.NegType, logger, diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 5ecace6d89..3d06109e39 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -1811,6 +1811,18 @@ func TestEnableDegradedMode(t *testing.T) { }, }) } + testLabels := map[string]string{ + "run": "foo", + } // this should match to pod labels + s.serviceLister.Add(&corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testServiceNamespace, + Name: testServiceName, + }, + Spec: corev1.ServiceSpec{ + Selector: testLabels, + }, + }) for _, eps := range tc.testEndpointSlices { s.endpointSliceLister.Add(eps) } @@ -2073,7 +2085,7 @@ func TestCollectLabelStats(t *testing.T) { func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) { negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false) - ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), ts.enableDualStackNEG) + ts.endpointsCalculator = GetEndpointsCalculator(ts.podLister, ts.nodeLister, ts.serviceLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), ts.enableDualStackNEG) return negsyncer, ts } @@ -2114,7 +2126,8 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp testContext.NodeInformer.GetIndexer(), testContext.SvcNegInformer.GetIndexer(), reflector, - GetEndpointsCalculator(testContext.NodeInformer.GetIndexer(), testContext.PodInformer.GetIndexer(), fakeZoneGetter, svcPort, mode, klog.TODO(), testContext.EnableDualStackNEG), + GetEndpointsCalculator(testContext.PodInformer.GetIndexer(), testContext.NodeInformer.GetIndexer(), testContext.ServiceInformer.GetIndexer(), + fakeZoneGetter, svcPort, mode, klog.TODO(), testContext.EnableDualStackNEG), string(kubeSystemUID), testContext.SvcNegClient, metrics.FakeSyncerMetrics(), diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index de3042bf99..dbc2906df9 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -445,7 +445,7 @@ func getEndpointPod( // toZoneNetworkEndpointMap translates addresses in endpoints object into zone and endpoints map, and also return the count for duplicated endpoints // we will not raise error in degraded mode for misconfigured endpoints, instead they will be filtered directly -func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister cache.Indexer, servicePortName string, networkEndpointType negtypes.NetworkEndpointType, enableDualStackNEG bool) ZoneNetworkEndpointMapResult { +func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister, serviceLister cache.Indexer, servicePortName string, networkEndpointType negtypes.NetworkEndpointType, enableDualStackNEG bool) ZoneNetworkEndpointMapResult { zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{} networkEndpointPodMap := negtypes.EndpointPodMap{} dupCount := 0 @@ -461,6 +461,8 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett if len(matchPort) == 0 { continue } + serviceName := ed.Meta.Labels[discovery.LabelServiceName] + isCustomEPS := ed.Meta.Labels[discovery.LabelManagedBy] != "endpointslice-controller.k8s.io" for _, endpointAddress := range ed.Addresses { if !enableDualStackNEG && endpointAddress.AddressType != discovery.AddressTypeIPv4 { klog.Infof("Skipping non IPv4 address in degraded mode: %q, in endpoint slice %s/%s", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name) @@ -471,7 +473,7 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett klog.Errorf("Endpoint %q in Endpoints %s/%s receives error when getting pod, err: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, getPodErr) continue } - if err := validatePod(pod, nodeLister); err != nil { + if err := validatePod(pod, nodeLister, serviceLister, serviceName, isCustomEPS); err != nil { klog.Errorf("Endpoint %q in Endpoints %s/%s correponds to an invalid pod: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, err) continue } @@ -525,7 +527,8 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett // 1. is in terminal state // 2. corresponds to a non-existent node // 3. have an IP that matches to a podIP, but is outside of the node's allocated IP range -func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) error { +// 4. has labels not matching to its service's label selector +func validatePod(pod *apiv1.Pod, nodeLister, serviceLister cache.Indexer, serviceName string, isCustomEPS bool) error { // Terminal Pod means a pod is in PodFailed or PodSucceeded phase phase := pod.Status.Phase if phase == apiv1.PodFailed || phase == apiv1.PodSucceeded { @@ -543,6 +546,18 @@ func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) error { klog.V(2).Info("Pod %s/%s has an IP %v that is outside of the node's allocated IP range(s) %v, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Status.PodIP, node.Spec.PodCIDRs) return err } + service := getService(serviceLister, pod.ObjectMeta.Namespace, serviceName) + if service == nil { + klog.V(2).Info("Endpoint does not correspond to an existing service %s, skipping", serviceName) + return negtypes.ErrEPServiceNotFound + } + if isCustomEPS { + return nil + } + if err = podBelongsToService(pod, service); err != nil { + klog.V(2).Info("Pod %s/%s labels %v does not match service %s/%s selector %v", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Labels, service.Namespace, service.Name, service.Spec.Selector) + return err + } return nil } @@ -607,6 +622,19 @@ func nodeContainsPodIP(node *apiv1.Node, pod *apiv1.Pod) error { return negtypes.ErrEPIPOutOfPodCIDR } +// podBelongsToService checks the pod's labels +// and return error if any label specified in the service's label selector is not in the pod's labels +func podBelongsToService(pod *apiv1.Pod, service *apiv1.Service) error { + podLabels := pod.ObjectMeta.Labels + serviceLabels := service.Spec.Selector + for key, val1 := range serviceLabels { + if val2, contains := podLabels[key]; !contains || val1 != val2 { + return negtypes.ErrEPPodLabelMismatch + } + } + return nil +} + // retrieveExistingZoneNetworkEndpointMap lists existing network endpoints in the neg and return the zone and endpoints map func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes.ZoneGetter, cloud negtypes.NetworkEndpointGroupCloud, version meta.Version, mode negtypes.EndpointsCalculatorMode) (map[string]negtypes.NetworkEndpointSet, labels.EndpointPodLabelMap, error) { // Include zones that have non-candidate nodes currently. It is possible that NEGs were created in those zones previously and the endpoints now became non-candidates. diff --git a/pkg/neg/syncers/utils_test.go b/pkg/neg/syncers/utils_test.go index ffc4179a9f..ee2104349e 100644 --- a/pkg/neg/syncers/utils_test.go +++ b/pkg/neg/syncers/utils_test.go @@ -1753,6 +1753,19 @@ func TestToZoneNetworkEndpointMapDegradedMode(t *testing.T) { }, }) } + testLabels := map[string]string{ + "run": "foo", + } + serviceLister := testContext.ServiceInformer.GetIndexer() + serviceLister.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testServiceNamespace, + Name: testServiceName, + }, + Spec: v1.ServiceSpec{ + Selector: testLabels, + }, + }) testNonExistPort := "non-exists" testEmptyNamedPort := "" @@ -1848,7 +1861,7 @@ func TestToZoneNetworkEndpointMapDegradedMode(t *testing.T) { } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - result := toZoneNetworkEndpointMapDegradedMode(negtypes.EndpointsDataFromEndpointSlices(tc.testEndpointSlices), fakeZoneGetter, podLister, nodeLister, tc.portName, tc.networkEndpointType, false) + result := toZoneNetworkEndpointMapDegradedMode(negtypes.EndpointsDataFromEndpointSlices(tc.testEndpointSlices), fakeZoneGetter, podLister, nodeLister, serviceLister, tc.portName, tc.networkEndpointType, false) if !reflect.DeepEqual(result.NetworkEndpointSet, tc.expectedEndpointMap) { t.Errorf("degraded mode endpoint set is not calculated correctly:\ngot %+v,\n expected %+v", result.NetworkEndpointSet, tc.expectedEndpointMap) } @@ -1883,6 +1896,21 @@ func TestDegradedModeValidateEndpointInfo(t *testing.T) { }, }) + testLabels := map[string]string{ + "run": "foo", + } + serviceLister := testContext.ServiceInformer.GetIndexer() + managedByController := "endpointslice-controller.k8s.io" + serviceLister.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testServiceNamespace, + Name: testServiceName, + }, + Spec: v1.ServiceSpec{ + Selector: testLabels, + }, + }) + endpointMap := map[string]negtypes.NetworkEndpointSet{ negtypes.TestZone1: negtypes.NewNetworkEndpointSet( networkEndpointFromEncodedEndpoint("10.100.1.1||instance1||80"), @@ -1910,6 +1938,7 @@ func TestDegradedModeValidateEndpointInfo(t *testing.T) { Namespace: testServiceNamespace, Labels: map[string]string{ discovery.LabelServiceName: testServiceName, + discovery.LabelManagedBy: managedByController, }, }, AddressType: "IPv4", @@ -1953,6 +1982,7 @@ func TestDegradedModeValidateEndpointInfo(t *testing.T) { Namespace: testServiceNamespace, Labels: map[string]string{ discovery.LabelServiceName: testServiceName, + discovery.LabelManagedBy: managedByController, }, }, AddressType: "IPv4", @@ -1996,6 +2026,7 @@ func TestDegradedModeValidateEndpointInfo(t *testing.T) { Namespace: testServiceNamespace, Labels: map[string]string{ discovery.LabelServiceName: testServiceName, + discovery.LabelManagedBy: managedByController, }, }, AddressType: "IPv4", @@ -2041,7 +2072,7 @@ func TestDegradedModeValidateEndpointInfo(t *testing.T) { } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - result := toZoneNetworkEndpointMapDegradedMode(negtypes.EndpointsDataFromEndpointSlices(tc.testEndpointSlices), fakeZoneGetter, podLister, nodeLister, emptyNamedPort, tc.endpointType, false) + result := toZoneNetworkEndpointMapDegradedMode(negtypes.EndpointsDataFromEndpointSlices(tc.testEndpointSlices), fakeZoneGetter, podLister, nodeLister, serviceLister, emptyNamedPort, tc.endpointType, false) if !reflect.DeepEqual(result.NetworkEndpointSet, tc.expectedEndpointMap) { t.Errorf("degraded mode endpoint set is not calculated correctly:\ngot %+v,\n expected %+v", result.NetworkEndpointSet, tc.expectedEndpointMap) } @@ -2070,10 +2101,29 @@ func TestValidatePod(t *testing.T) { }) testPodIPv4 := "10.100.1.1" testPodIPv6 := "2001:db8::2:1" + testLabels1 := map[string]string{ + "run": "foo", + } + testLabels2 := map[string]string{ + "run": "bar", + } + serviceLister := testContext.ServiceInformer.GetIndexer() + serviceLister.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testServiceNamespace, + Name: testServiceName, + }, + Spec: v1.ServiceSpec{ + Selector: testLabels1, + }, + }) + testServiceNameNotFound := "foo" testCases := []struct { - desc string - pod *v1.Pod - expectErr error + desc string + pod *v1.Pod + serviceName string + isCustomEPS bool + expectErr error }{ { desc: "a valid pod with IPv4 address and phase running", @@ -2081,6 +2131,7 @@ func TestValidatePod(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: testServiceNamespace, Name: "pod1", + Labels: testLabels1, }, Status: v1.PodStatus{ Phase: v1.PodRunning, @@ -2090,7 +2141,9 @@ func TestValidatePod(t *testing.T) { NodeName: instance1, }, }, - expectErr: nil, + serviceName: testServiceName, + isCustomEPS: false, + expectErr: nil, }, { desc: "a valid pod with IPv6 address and phase running", @@ -2098,6 +2151,7 @@ func TestValidatePod(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: testServiceNamespace, Name: "pod2", + Labels: testLabels1, }, Status: v1.PodStatus{ Phase: v1.PodRunning, @@ -2107,7 +2161,9 @@ func TestValidatePod(t *testing.T) { NodeName: instance1, }, }, - expectErr: nil, + serviceName: testServiceName, + isCustomEPS: false, + expectErr: nil, }, { desc: "a terminal pod with phase failed", @@ -2115,6 +2171,7 @@ func TestValidatePod(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: testServiceNamespace, Name: "pod3", + Labels: testLabels1, }, Status: v1.PodStatus{ Phase: v1.PodFailed, @@ -2124,7 +2181,9 @@ func TestValidatePod(t *testing.T) { NodeName: instance1, }, }, - expectErr: negtypes.ErrEPPodTerminal, + serviceName: testServiceName, + isCustomEPS: false, + expectErr: negtypes.ErrEPPodTerminal, }, { desc: "a terminal pod with phase succeeded", @@ -2132,6 +2191,7 @@ func TestValidatePod(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: testServiceNamespace, Name: "pod4", + Labels: testLabels1, }, Status: v1.PodStatus{ Phase: v1.PodSucceeded, @@ -2141,7 +2201,9 @@ func TestValidatePod(t *testing.T) { NodeName: instance1, }, }, - expectErr: negtypes.ErrEPPodTerminal, + serviceName: testServiceName, + isCustomEPS: false, + expectErr: negtypes.ErrEPPodTerminal, }, { desc: "a pod from non-existent node", @@ -2149,6 +2211,7 @@ func TestValidatePod(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: testServiceNamespace, Name: "pod5", + Labels: testLabels1, }, Status: v1.PodStatus{ Phase: v1.PodRunning, @@ -2158,7 +2221,9 @@ func TestValidatePod(t *testing.T) { NodeName: testNodeNonExistent, }, }, - expectErr: negtypes.ErrEPNodeNotFound, + serviceName: testServiceName, + isCustomEPS: false, + expectErr: negtypes.ErrEPNodeNotFound, }, { desc: "a pod with IPv4 IP adress outside of the node's allocated pod range", @@ -2166,6 +2231,7 @@ func TestValidatePod(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: testServiceNamespace, Name: "pod6", + Labels: testLabels1, }, Status: v1.PodStatus{ Phase: v1.PodRunning, @@ -2175,7 +2241,9 @@ func TestValidatePod(t *testing.T) { NodeName: instance1, }, }, - expectErr: negtypes.ErrEPIPOutOfPodCIDR, + serviceName: testServiceName, + isCustomEPS: false, + expectErr: negtypes.ErrEPIPOutOfPodCIDR, }, { desc: "a pod with IPv6 IP address outside of the node's allocated pod range", @@ -2183,6 +2251,7 @@ func TestValidatePod(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: testServiceNamespace, Name: "pod7", + Labels: testLabels1, }, Status: v1.PodStatus{ Phase: v1.PodRunning, @@ -2192,12 +2261,74 @@ func TestValidatePod(t *testing.T) { NodeName: instance1, }, }, - expectErr: negtypes.ErrEPIPOutOfPodCIDR, + serviceName: testServiceName, + isCustomEPS: false, + expectErr: negtypes.ErrEPIPOutOfPodCIDR, + }, + { + desc: "a pod with non-existing service name", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testServiceNamespace, + Name: "pod8", + Labels: testLabels1, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: testPodIPv4, + }, + Spec: v1.PodSpec{ + NodeName: instance1, + }, + }, + serviceName: testServiceNameNotFound, + isCustomEPS: false, + expectErr: negtypes.ErrEPServiceNotFound, + }, + { + desc: "a pod referenced by a non-custom endpoint slice, with labels not matching to service's label selector", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testServiceNamespace, + Name: "pod9", + Labels: testLabels2, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: testPodIPv4, + }, + Spec: v1.PodSpec{ + NodeName: instance1, + }, + }, + serviceName: testServiceName, + isCustomEPS: false, + expectErr: negtypes.ErrEPPodLabelMismatch, + }, + { + desc: "a pod referenced by a custom endpoint slice, with labels not matching to service's label selector", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testServiceNamespace, + Name: "pod10", + Labels: testLabels2, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: testPodIPv4, + }, + Spec: v1.PodSpec{ + NodeName: instance1, + }, + }, + serviceName: testServiceName, + isCustomEPS: true, // for custom endpoint slice, we won't check the pod's labels + expectErr: nil, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - if got := validatePod(tc.pod, nodeLister); !errors.Is(got, tc.expectErr) { + if got := validatePod(tc.pod, nodeLister, serviceLister, tc.serviceName, tc.isCustomEPS); !errors.Is(got, tc.expectErr) { t.Errorf("validatePod() = %t, expected %t\n", got, tc.expectErr) } }) @@ -2245,6 +2376,9 @@ func TestParseIPAddress(t *testing.T) { } func addPodsToLister(podLister cache.Indexer, endpointSlices []*discovery.EndpointSlice) { + testLabels := map[string]string{ + "run": "foo", + } // collect both ipv4 and ipv6 IP address for pods podToIPs := make(map[string][]v1.PodIP) podToNodeName := make(map[string]string) @@ -2265,6 +2399,7 @@ func addPodsToLister(podLister cache.Indexer, endpointSlices []*discovery.Endpoi ObjectMeta: metav1.ObjectMeta{ Namespace: podNamespace, Name: podName, + Labels: testLabels, }, Spec: v1.PodSpec{ NodeName: podToNodeName[pod], @@ -2352,6 +2487,7 @@ func getTestEndpointSlices(name, namespace string) []*discovery.EndpointSlice { port81 := int32(81) port8081 := int32(8081) protocolTCP := v1.ProtocolTCP + managedByController := "endpointslice-controller.k8s.io" return []*discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -2359,6 +2495,7 @@ func getTestEndpointSlices(name, namespace string) []*discovery.EndpointSlice { Namespace: namespace, Labels: map[string]string{ discovery.LabelServiceName: name, + discovery.LabelManagedBy: managedByController, }, }, AddressType: "IPv4", @@ -2428,6 +2565,7 @@ func getTestEndpointSlices(name, namespace string) []*discovery.EndpointSlice { Namespace: namespace, Labels: map[string]string{ discovery.LabelServiceName: name, + discovery.LabelManagedBy: managedByController, }, }, AddressType: "IPv4", @@ -2472,6 +2610,7 @@ func getTestEndpointSlices(name, namespace string) []*discovery.EndpointSlice { Namespace: namespace, Labels: map[string]string{ discovery.LabelServiceName: name, + discovery.LabelManagedBy: managedByController, }, }, AddressType: "IPv4", @@ -2516,6 +2655,7 @@ func getTestEndpointSlices(name, namespace string) []*discovery.EndpointSlice { Namespace: namespace, Labels: map[string]string{ discovery.LabelServiceName: name, + discovery.LabelManagedBy: managedByController, }, }, AddressType: discovery.AddressTypeIPv6, diff --git a/pkg/neg/types/sync_errors.go b/pkg/neg/types/sync_errors.go index 001e351efe..e49a1d8490 100644 --- a/pkg/neg/types/sync_errors.go +++ b/pkg/neg/types/sync_errors.go @@ -34,6 +34,8 @@ const ( ReasonInvalidEPDetach = Reason("InvalidEPDetach") ReasonEPIPNotFromPod = Reason("EPIPNotFromPod") ReasonEPIPOutOfPodCIDR = Reason("EPIPOutOfPodCIDR") + ReasonEPServiceNotFound = Reason("EPServiceNotFound") + ReasonEPPodLabelMismatch = Reason("EPPodLabelMismatch") // these are for non error-state error ReasonNegNotFound = Reason("NegNotFound") @@ -124,6 +126,16 @@ var ( Reason: ReasonEPIPOutOfPodCIDR, IsErrorState: true, } + ErrEPServiceNotFound = NegSyncError{ + Err: errors.New("endpoint corresponds to a non-existent service"), + Reason: ReasonEPServiceNotFound, + IsErrorState: true, + } + ErrEPPodLabelMismatch = NegSyncError{ + Err: errors.New("endpoint corresponds to a pod with labels not matching to its service"), + Reason: ReasonEPPodLabelMismatch, + IsErrorState: true, + } ErrNegNotFound = NegSyncError{ Err: errors.New("failed to get NEG for service"),