diff --git a/deployments/helm-chart/templates/rbac.yaml b/deployments/helm-chart/templates/rbac.yaml index ad95710a86..f1070842ec 100644 --- a/deployments/helm-chart/templates/rbac.yaml +++ b/deployments/helm-chart/templates/rbac.yaml @@ -30,11 +30,18 @@ rules: - watch - list {{- end }} +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - "" resources: - services - - endpoints verbs: - get - list diff --git a/deployments/rbac/rbac.yaml b/deployments/rbac/rbac.yaml index 0501d9f04e..af4b44e6eb 100644 --- a/deployments/rbac/rbac.yaml +++ b/deployments/rbac/rbac.yaml @@ -3,11 +3,18 @@ apiVersion: rbac.authorization.k8s.io/v1 metadata: name: nginx-ingress rules: +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - "" resources: - services - - endpoints verbs: - get - list diff --git a/internal/k8s/controller.go b/internal/k8s/controller.go index 4b34be3057..6c242739dc 100644 --- a/internal/k8s/controller.go +++ b/internal/k8s/controller.go @@ -53,6 +53,7 @@ import ( "github.com/nginxinc/kubernetes-ingress/internal/metrics/collectors" api_v1 "k8s.io/api/core/v1" + discovery_v1 "k8s.io/api/discovery/v1" networking "k8s.io/api/networking/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -332,7 +333,7 @@ type namespacedInformer struct { dynInformerFactory dynamicinformer.DynamicSharedInformerFactory ingressLister storeToIngressLister svcLister cache.Store - endpointLister storeToEndpointLister + endpointSliceLister storeToEndpointSliceLister podLister indexerToPodLister secretLister cache.Store virtualServerLister cache.Store @@ -359,7 +360,7 @@ func (lbc *LoadBalancerController) newNamespacedInformer(ns string) { // create handlers for resources we care about lbc.addIngressHandler(createIngressHandlers(lbc), nsi) lbc.addServiceHandler(createServiceHandlers(lbc), nsi) - lbc.addEndpointHandler(createEndpointHandlers(lbc), nsi) + lbc.addEndpointSliceHandler(createEndpointSliceHandlers(lbc), nsi) lbc.addPodHandler(nsi) secretsTweakListOptionsFunc := func(options *meta_v1.ListOptions) { @@ -512,13 +513,13 @@ func (lbc *LoadBalancerController) addIngressHandler(handlers cache.ResourceEven lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } -// addEndpointHandler adds the handler for endpoints to the controller -func (lbc *LoadBalancerController) addEndpointHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { - informer := nsi.sharedInformerFactory.Core().V1().Endpoints().Informer() +// addEndpointSliceHandler adds the handler for EndpointSlices to the controller +func (lbc *LoadBalancerController) addEndpointSliceHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) { + informer := nsi.sharedInformerFactory.Discovery().V1().EndpointSlices().Informer() informer.AddEventHandler(handlers) - var el storeToEndpointLister + var el storeToEndpointSliceLister el.Store = informer.GetStore() - nsi.endpointLister = el + nsi.endpointSliceLister = el lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced) } @@ -697,60 +698,60 @@ func (lbc *LoadBalancerController) getNamespacedInformer(ns string) *namespacedI return nsi } -func (lbc *LoadBalancerController) syncEndpoints(task task) { +func (lbc *LoadBalancerController) syncEndpointSlices(task task) { key := task.Key var obj interface{} - var endpExists bool + var endpointSliceExists bool var err error - glog.V(3).Infof("Syncing endpoints %v", key) + glog.V(3).Infof("Syncing EndpointSlices %v", key) ns, _, _ := cache.SplitMetaNamespaceKey(key) - obj, endpExists, err = lbc.getNamespacedInformer(ns).endpointLister.GetByKey(key) + obj, endpointSliceExists, err = lbc.getNamespacedInformer(ns).endpointSliceLister.GetByKey(key) if err != nil { lbc.syncQueue.Requeue(task, err) return } - if !endpExists { + if !endpointSliceExists { return } - endp := obj.(*api_v1.Endpoints) - resources := lbc.configuration.FindResourcesForEndpoints(endp.Namespace, endp.Name) + endpointSlice := obj.(*discovery_v1.EndpointSlice) + svcResource := lbc.configuration.FindResourcesForService(endpointSlice.Namespace, endpointSlice.Labels["kubernetes.io/service-name"]) - resourceExes := lbc.createExtendedResources(resources) + resourceExes := lbc.createExtendedResources(svcResource) if len(resourceExes.IngressExes) > 0 { - glog.V(3).Infof("Updating Endpoints for %v", resourceExes.IngressExes) + glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.IngressExes) err = lbc.configurator.UpdateEndpoints(resourceExes.IngressExes) if err != nil { - glog.Errorf("Error updating endpoints for %v: %v", resourceExes.IngressExes, err) + glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.IngressExes, err) } } if len(resourceExes.MergeableIngresses) > 0 { - glog.V(3).Infof("Updating Endpoints for %v", resourceExes.MergeableIngresses) + glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.MergeableIngresses) err = lbc.configurator.UpdateEndpointsMergeableIngress(resourceExes.MergeableIngresses) if err != nil { - glog.Errorf("Error updating endpoints for %v: %v", resourceExes.MergeableIngresses, err) + glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.MergeableIngresses, err) } } if lbc.areCustomResourcesEnabled { if len(resourceExes.VirtualServerExes) > 0 { - glog.V(3).Infof("Updating endpoints for %v", resourceExes.VirtualServerExes) + glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.VirtualServerExes) err := lbc.configurator.UpdateEndpointsForVirtualServers(resourceExes.VirtualServerExes) if err != nil { - glog.Errorf("Error updating endpoints for %v: %v", resourceExes.VirtualServerExes, err) + glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.VirtualServerExes, err) } } if len(resourceExes.TransportServerExes) > 0 { - glog.V(3).Infof("Updating endpoints for %v", resourceExes.TransportServerExes) + glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.TransportServerExes) err := lbc.configurator.UpdateEndpointsForTransportServers(resourceExes.TransportServerExes) if err != nil { - glog.Errorf("Error updating endpoints for %v: %v", resourceExes.TransportServerExes, err) + glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.TransportServerExes, err) } } } @@ -905,8 +906,8 @@ func (lbc *LoadBalancerController) sync(task task) { lbc.updateTransportServerMetrics() case configMap: lbc.syncConfigMap(task) - case endpoints: - lbc.syncEndpoints(task) + case endpointslice: + lbc.syncEndpointSlices(task) case secret: lbc.syncSecret(task) case service: @@ -3280,43 +3281,54 @@ func (lbc *LoadBalancerController) getEndpointsForServiceWithSubselector(targetP return nil, fmt.Errorf("error getting pods in namespace %v that match the selector %v: %w", svc.Namespace, labels.Merge(svc.Spec.Selector, subselector), err) } - var svcEps api_v1.Endpoints - svcEps, err = nsi.endpointLister.GetServiceEndpoints(svc) + var svcEndpointSlices []discovery_v1.EndpointSlice + svcEndpointSlices, err = nsi.endpointSliceLister.GetServiceEndpointSlices(svc) if err != nil { - glog.V(3).Infof("Error getting endpoints for service %s from the cache: %v", svc.Name, err) + glog.V(3).Infof("Error getting endpointslices for service %s from the cache: %v", svc.Name, err) return nil, err } - endps = getEndpointsBySubselectedPods(targetPort, pods, svcEps) + endps = getEndpointsFromEndpointSlicesForSubselectedPods(targetPort, pods, svcEndpointSlices) return endps, nil } -func getEndpointsBySubselectedPods(targetPort int32, pods []*api_v1.Pod, svcEps api_v1.Endpoints) (endps []podEndpoint) { +func getEndpointsFromEndpointSlicesForSubselectedPods(targetPort int32, pods []*api_v1.Pod, svcEndpointSlices []discovery_v1.EndpointSlice) (podEndpoints []podEndpoint) { + endpointSet := make(map[podEndpoint]struct{}) for _, pod := range pods { - for _, subset := range svcEps.Subsets { - for _, port := range subset.Ports { - if port.Port != targetPort { + for _, endpointSlice := range svcEndpointSlices { + for _, port := range endpointSlice.Ports { + if *port.Port != targetPort { continue } - for _, address := range subset.Addresses { - if address.IP == pod.Status.PodIP { - addr := ipv6SafeAddrPort(pod.Status.PodIP, targetPort) - ownerType, ownerName := getPodOwnerTypeAndName(pod) - podEnd := podEndpoint{ - Address: addr, - PodName: getPodName(address.TargetRef), - MeshPodOwner: configs.MeshPodOwner{ - OwnerType: ownerType, - OwnerName: ownerName, - }, + for _, endpoint := range endpointSlice.Endpoints { + for _, address := range endpoint.Addresses { + if pod.Status.PodIP == address { + addr := ipv6SafeAddrPort(pod.Status.PodIP, targetPort) + ownerType, ownerName := getPodOwnerTypeAndName(pod) + podEndpoint := podEndpoint{ + Address: addr, + PodName: getPodName(endpoint.TargetRef), + MeshPodOwner: configs.MeshPodOwner{ + OwnerType: ownerType, + OwnerName: ownerName, + }, + } + endpointSet[podEndpoint] = struct{}{} + podEndpoints = append(podEndpoints, podEndpoint) } - endps = append(endps, podEnd) } } } } } - return endps + if len(endpointSet) == 0 { + return nil + } + endpoints := make([]podEndpoint, 0, len(endpointSet)) + for ep := range endpointSet { + endpoints = append(endpoints, ep) + } + return endpoints } func ipv6SafeAddrPort(addr string, port int32) string { @@ -3394,8 +3406,8 @@ func (lbc *LoadBalancerController) getExternalEndpointsForIngressBackend(backend } func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *networking.IngressBackend, svc *api_v1.Service) (result []podEndpoint, isExternal bool, err error) { - var endps api_v1.Endpoints - endps, err = lbc.getNamespacedInformer(svc.Namespace).endpointLister.GetServiceEndpoints(svc) + var endpointSlices []discovery_v1.EndpointSlice + endpointSlices, err = lbc.getNamespacedInformer(svc.Namespace).endpointSliceLister.GetServiceEndpointSlices(svc) if err != nil { if svc.Spec.Type == api_v1.ServiceTypeExternalName { @@ -3409,15 +3421,15 @@ func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *networ return nil, false, err } - result, err = lbc.getEndpointsForPort(endps, backend.Service.Port, svc) + result, err = lbc.getEndpointsForPortFromEndpointSlices(endpointSlices, backend.Service.Port, svc) if err != nil { - glog.V(3).Infof("Error getting endpoints for service %s port %v: %v", svc.Name, configs.GetBackendPortAsString(backend.Service.Port), err) + glog.V(3).Infof("Error getting endpointslices for service %s port %v: %v", svc.Name, configs.GetBackendPortAsString(backend.Service.Port), err) return nil, false, err } return result, false, nil } -func (lbc *LoadBalancerController) getEndpointsForPort(endps api_v1.Endpoints, backendPort networking.ServiceBackendPort, svc *api_v1.Service) ([]podEndpoint, error) { +func (lbc *LoadBalancerController) getEndpointsForPortFromEndpointSlices(endpointSlices []discovery_v1.EndpointSlice, backendPort networking.ServiceBackendPort, svc *api_v1.Service) ([]podEndpoint, error) { var targetPort int32 var err error @@ -3435,29 +3447,36 @@ func (lbc *LoadBalancerController) getEndpointsForPort(endps api_v1.Endpoints, b return nil, fmt.Errorf("no port %v in service %s", backendPort, svc.Name) } - for _, subset := range endps.Subsets { - for _, port := range subset.Ports { - if port.Port == targetPort { - var endpoints []podEndpoint - for _, address := range subset.Addresses { - addr := ipv6SafeAddrPort(address.IP, port.Port) - podEnd := podEndpoint{ - Address: addr, - } - if address.TargetRef != nil { - parentType, parentName := lbc.getPodOwnerTypeAndNameFromAddress(address.TargetRef.Namespace, address.TargetRef.Name) - podEnd.OwnerType = parentType - podEnd.OwnerName = parentName - podEnd.PodName = address.TargetRef.Name + endpointSet := make(map[podEndpoint]struct{}) + for _, endpointSlice := range endpointSlices { + for _, endpointSlicePort := range endpointSlice.Ports { + if *endpointSlicePort.Port == targetPort { + for _, endpoint := range endpointSlice.Endpoints { + for _, endpointAddress := range endpoint.Addresses { + address := ipv6SafeAddrPort(endpointAddress, *endpointSlicePort.Port) + podEndpoint := podEndpoint{ + Address: address, + } + if endpoint.TargetRef != nil { + parentType, parentName := lbc.getPodOwnerTypeAndNameFromAddress(endpoint.TargetRef.Namespace, endpoint.TargetRef.Name) + podEndpoint.OwnerType = parentType + podEndpoint.OwnerName = parentName + podEndpoint.PodName = endpoint.TargetRef.Name + } + endpointSet[podEndpoint] = struct{}{} } - endpoints = append(endpoints, podEnd) } - return endpoints, nil } } } - - return nil, fmt.Errorf("no endpoints for target port %v in service %s", targetPort, svc.Name) + if len(endpointSet) == 0 { + return nil, fmt.Errorf("no endpointslices for target port %v in service %s", targetPort, svc.Name) + } + endpoints := make([]podEndpoint, 0, len(endpointSet)) + for ep := range endpointSet { + endpoints = append(endpoints, ep) + } + return endpoints, nil } func (lbc *LoadBalancerController) getPodOwnerTypeAndNameFromAddress(ns, name string) (parentType, parentName string) { diff --git a/internal/k8s/controller_test.go b/internal/k8s/controller_test.go index 37c9ae02c2..4a308c27c1 100644 --- a/internal/k8s/controller_test.go +++ b/internal/k8s/controller_test.go @@ -8,6 +8,8 @@ import ( "strings" "testing" + discovery_v1 "k8s.io/api/discovery/v1" + "github.com/google/go-cmp/cmp" "github.com/nginxinc/kubernetes-ingress/internal/configs" "github.com/nginxinc/kubernetes-ingress/internal/configs/version1" @@ -470,63 +472,789 @@ func TestFormatWarningsMessages(t *testing.T) { } } -func TestGetEndpointsBySubselectedPods(t *testing.T) { +func TestGetEndpointsFromEndpointSlices_DuplicateEndpointsInOneEndpointSlice(t *testing.T) { + endpointPort := int32(8080) + + lbc := LoadBalancerController{ + isNginxPlus: true, + } + + backendServicePort := networking.ServiceBackendPort{ + Number: 8080, + Name: "foo", + } + + tests := []struct { + desc string + svc api_v1.Service + svcEndpointSlices []discovery_v1.EndpointSlice + expectedEndpoints []podEndpoint + }{ + { + desc: "duplicate endpoints in an endpointslice", + svc: api_v1.Service{ + TypeMeta: meta_v1.TypeMeta{}, + ObjectMeta: meta_v1.ObjectMeta{ + Name: "coffee-svc", + Namespace: "default", + }, + Spec: api_v1.ServiceSpec{ + Ports: []api_v1.ServicePort{ + { + Name: "foo", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + Status: api_v1.ServiceStatus{}, + }, + expectedEndpoints: []podEndpoint{ + { + Address: "1.2.3.4:8080", + }, + }, + svcEndpointSlices: []discovery_v1.EndpointSlice{ + { + Ports: []discovery_v1.EndpointPort{ + { + Port: &endpointPort, + }, + }, + Endpoints: []discovery_v1.Endpoint{ + { + Addresses: []string{ + "1.2.3.4", + }, + }, + { + Addresses: []string{ + "1.2.3.4", + }, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + gotEndpoints, err := lbc.getEndpointsForPortFromEndpointSlices(test.svcEndpointSlices, backendServicePort, &test.svc) + if err != nil { + t.Fatal(err) + } + if result := unorderedEqual(gotEndpoints, test.expectedEndpoints); !result { + t.Errorf("lbc.getEndpointsForPortFromEndpointSlices() got %v, want %v", + gotEndpoints, test.expectedEndpoints) + } + }) + } +} + +func TestGetEndpointsFromEndpointSlices_TwoDifferentEndpointsInOnEndpointSlice(t *testing.T) { + endpointPort := int32(8080) + + lbc := LoadBalancerController{ + isNginxPlus: true, + } + + backendServicePort := networking.ServiceBackendPort{ + Number: 8080, + Name: "foo", + } + + tests := []struct { + desc string + svc api_v1.Service + svcEndpointSlices []discovery_v1.EndpointSlice + expectedEndpoints []podEndpoint + }{ + { + desc: "two different endpoints in one endpoint slice", + svc: api_v1.Service{ + TypeMeta: meta_v1.TypeMeta{}, + ObjectMeta: meta_v1.ObjectMeta{ + Name: "coffee-svc", + Namespace: "default", + }, + Spec: api_v1.ServiceSpec{ + Ports: []api_v1.ServicePort{ + { + Name: "foo", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + Status: api_v1.ServiceStatus{}, + }, + expectedEndpoints: []podEndpoint{ + { + Address: "1.2.3.4:8080", + }, + { + Address: "5.6.7.8:8080", + }, + }, + svcEndpointSlices: []discovery_v1.EndpointSlice{ + { + Ports: []discovery_v1.EndpointPort{ + { + Port: &endpointPort, + }, + }, + Endpoints: []discovery_v1.Endpoint{ + { + Addresses: []string{ + "1.2.3.4", + }, + }, + { + Addresses: []string{ + "5.6.7.8", + }, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + gotEndpoints, err := lbc.getEndpointsForPortFromEndpointSlices(test.svcEndpointSlices, backendServicePort, &test.svc) + if err != nil { + t.Fatal(err) + } + if result := unorderedEqual(gotEndpoints, test.expectedEndpoints); !result { + t.Errorf("lbc.getEndpointsForPortFromEndpointSlices() got %v, want %v", + gotEndpoints, test.expectedEndpoints) + } + }) + } +} + +func TestGetEndpointsFromEndpointSlices_DuplicateEndpointsAcrossTwoEndpointSlices(t *testing.T) { + endpointPort := int32(8080) + + lbc := LoadBalancerController{ + isNginxPlus: true, + } + + backendServicePort := networking.ServiceBackendPort{ + Number: 8080, + Name: "foo", + } + + tests := []struct { + desc string + svc api_v1.Service + svcEndpointSlices []discovery_v1.EndpointSlice + expectedEndpoints []podEndpoint + }{ + { + desc: "duplicate endpoints across two endpointslices", + svc: api_v1.Service{ + TypeMeta: meta_v1.TypeMeta{}, + ObjectMeta: meta_v1.ObjectMeta{ + Name: "coffee-svc", + Namespace: "default", + }, + Spec: api_v1.ServiceSpec{ + Ports: []api_v1.ServicePort{ + { + Name: "foo", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + Status: api_v1.ServiceStatus{}, + }, + expectedEndpoints: []podEndpoint{ + { + Address: "1.2.3.4:8080", + }, + { + Address: "5.6.7.8:8080", + }, + { + Address: "10.0.0.1:8080", + }, + }, + svcEndpointSlices: []discovery_v1.EndpointSlice{ + { + Ports: []discovery_v1.EndpointPort{ + { + Port: &endpointPort, + }, + }, + Endpoints: []discovery_v1.Endpoint{ + { + Addresses: []string{ + "1.2.3.4", + }, + }, + { + Addresses: []string{ + "5.6.7.8", + }, + }, + }, + }, + { + Ports: []discovery_v1.EndpointPort{ + { + Port: &endpointPort, + }, + }, + Endpoints: []discovery_v1.Endpoint{ + { + Addresses: []string{ + "1.2.3.4", + }, + }, + { + Addresses: []string{ + "10.0.0.1", + }, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + gotEndpoints, err := lbc.getEndpointsForPortFromEndpointSlices(test.svcEndpointSlices, backendServicePort, &test.svc) + if err != nil { + t.Fatal(err) + } + if result := unorderedEqual(gotEndpoints, test.expectedEndpoints); !result { + t.Errorf("lbc.getEndpointsForPortFromEndpointSlices() got %v, want %v", + gotEndpoints, test.expectedEndpoints) + } + }) + } +} + +func TestGetEndpointsFromEndpointSlices_ErrorsOnInvalidTargetPort(t *testing.T) { + endpointPort := int32(8080) + + lbc := LoadBalancerController{ + isNginxPlus: true, + } + + backendServicePort := networking.ServiceBackendPort{ + Number: 8080, + Name: "foo", + } + + tests := []struct { + desc string + svc api_v1.Service + svcEndpointSlices []discovery_v1.EndpointSlice + }{ + { + desc: "Target Port should be 0", + svc: api_v1.Service{ + TypeMeta: meta_v1.TypeMeta{}, + ObjectMeta: meta_v1.ObjectMeta{ + Name: "coffee-svc", + Namespace: "default", + }, + Spec: api_v1.ServiceSpec{ + Ports: []api_v1.ServicePort{ + { + Name: "foo", + Port: 0, + TargetPort: intstr.FromInt(0), + }, + }, + }, + Status: api_v1.ServiceStatus{}, + }, + svcEndpointSlices: []discovery_v1.EndpointSlice{ + { + Ports: []discovery_v1.EndpointPort{ + { + Port: &endpointPort, + }, + }, + Endpoints: []discovery_v1.Endpoint{ + { + Addresses: []string{ + "1.2.3.4", + }, + }, + { + Addresses: []string{ + "5.6.7.8", + }, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + _, err := lbc.getEndpointsForPortFromEndpointSlices(test.svcEndpointSlices, backendServicePort, &test.svc) + if err == nil { + t.Logf("%s but was %+v\n", test.desc, test.svc.Spec.Ports[0].TargetPort.IntVal) + t.Fatal("want error, got nil") + } + }) + } +} + +func TestGetEndpointsFromEndpointSlices_ErrorsOnNoEndpointSlicesFound(t *testing.T) { + lbc := LoadBalancerController{ + isNginxPlus: true, + } + + backendServicePort := networking.ServiceBackendPort{ + Number: 8080, + Name: "foo", + } + + tests := []struct { + desc string + svc api_v1.Service + svcEndpointSlices []discovery_v1.EndpointSlice + }{ + { + desc: "No EndpointSlices should be found", + svc: api_v1.Service{ + TypeMeta: meta_v1.TypeMeta{}, + ObjectMeta: meta_v1.ObjectMeta{ + Name: "coffee-svc", + Namespace: "default", + }, + Spec: api_v1.ServiceSpec{ + Ports: []api_v1.ServicePort{ + { + Name: "foo", + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + Status: api_v1.ServiceStatus{}, + }, + svcEndpointSlices: []discovery_v1.EndpointSlice{}, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + _, err := lbc.getEndpointsForPortFromEndpointSlices(test.svcEndpointSlices, backendServicePort, &test.svc) + if err == nil { + t.Logf("%s but got %+v\n", test.desc, test.svcEndpointSlices) + t.Fatal("want error, got nil") + } + }) + } +} + +func TestGetEndpointSlicesBySubselectedPods_FindOnePodInOneEndpointSlice(t *testing.T) { + endpointPort := int32(8080) + + boolPointer := func(b bool) *bool { return &b } + tests := []struct { + desc string + targetPort int32 + svcEndpointSlices []discovery_v1.EndpointSlice + pods []*api_v1.Pod + expectedEndpoints []podEndpoint + }{ + { + desc: "find one pod in one endpointslice", + targetPort: 8080, + expectedEndpoints: []podEndpoint{ + { + Address: "1.2.3.4:8080", + MeshPodOwner: configs.MeshPodOwner{ + OwnerType: "deployment", + OwnerName: "deploy-1", + }, + }, + }, + pods: []*api_v1.Pod{ + { + ObjectMeta: meta_v1.ObjectMeta{ + OwnerReferences: []meta_v1.OwnerReference{ + { + Kind: "Deployment", + Name: "deploy-1", + Controller: boolPointer(true), + }, + }, + }, + Status: api_v1.PodStatus{ + PodIP: "1.2.3.4", + }, + }, + }, + svcEndpointSlices: []discovery_v1.EndpointSlice{ + { + Ports: []discovery_v1.EndpointPort{ + { + Port: &endpointPort, + }, + }, + Endpoints: []discovery_v1.Endpoint{ + { + Addresses: []string{ + "1.2.3.4", + }, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + gotEndpoints := getEndpointsFromEndpointSlicesForSubselectedPods(test.targetPort, test.pods, test.svcEndpointSlices) + + if result := unorderedEqual(gotEndpoints, test.expectedEndpoints); !result { + t.Errorf("getEndpointsFromEndpointSlicesForSubselectedPods() = got %v, want %v", gotEndpoints, test.expectedEndpoints) + } + }) + } +} + +func TestGetEndpointSlicesBySubselectedPods_FindOnePodInTwoEndpointSlicesWithDuplicateEndpoints(t *testing.T) { + endpointPort := int32(8080) + boolPointer := func(b bool) *bool { return &b } tests := []struct { - desc string - targetPort int32 - svcEps api_v1.Endpoints - expectedEps []podEndpoint + desc string + targetPort int32 + svcEndpointSlices []discovery_v1.EndpointSlice + pods []*api_v1.Pod + expectedEndpoints []podEndpoint }{ { - desc: "find one endpoint", - targetPort: 80, - expectedEps: []podEndpoint{ + desc: "find one pod in two endpointslices with duplicate endpoints", + targetPort: 8080, + expectedEndpoints: []podEndpoint{ { - Address: "1.2.3.4:80", + Address: "1.2.3.4:8080", MeshPodOwner: configs.MeshPodOwner{ OwnerType: "deployment", OwnerName: "deploy-1", }, }, }, + pods: []*api_v1.Pod{ + { + ObjectMeta: meta_v1.ObjectMeta{ + OwnerReferences: []meta_v1.OwnerReference{ + { + Kind: "Deployment", + Name: "deploy-1", + Controller: boolPointer(true), + }, + }, + }, + Status: api_v1.PodStatus{ + PodIP: "1.2.3.4", + }, + }, + }, + svcEndpointSlices: []discovery_v1.EndpointSlice{ + { + Ports: []discovery_v1.EndpointPort{ + { + Port: &endpointPort, + }, + }, + Endpoints: []discovery_v1.Endpoint{ + { + Addresses: []string{ + "1.2.3.4", + }, + }, + }, + }, + { + Ports: []discovery_v1.EndpointPort{ + { + Port: &endpointPort, + }, + }, + Endpoints: []discovery_v1.Endpoint{ + { + Addresses: []string{ + "1.2.3.4", + }, + }, + }, + }, + }, }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + gotEndpoints := getEndpointsFromEndpointSlicesForSubselectedPods(test.targetPort, test.pods, test.svcEndpointSlices) + + if result := unorderedEqual(gotEndpoints, test.expectedEndpoints); !result { + t.Errorf("getEndpointsFromEndpointSlicesForSubselectedPods() = got %v, want %v", gotEndpoints, test.expectedEndpoints) + } + }) + } +} + +func TestGetEndpointSlicesBySubselectedPods_FindTwoPodsInOneEndpointSlice(t *testing.T) { + endpointPort := int32(8080) + + boolPointer := func(b bool) *bool { return &b } + tests := []struct { + desc string + targetPort int32 + svcEndpointSlices []discovery_v1.EndpointSlice + pods []*api_v1.Pod + expectedEndpoints []podEndpoint + }{ { - desc: "targetPort mismatch", - targetPort: 21, - expectedEps: nil, + desc: "find two pods in one endpointslice", + targetPort: 8080, + expectedEndpoints: []podEndpoint{ + { + Address: "1.2.3.4:8080", + MeshPodOwner: configs.MeshPodOwner{ + OwnerType: "deployment", + OwnerName: "deploy-1", + }, + }, + { + Address: "5.6.7.8:8080", + MeshPodOwner: configs.MeshPodOwner{ + OwnerType: "deployment", + OwnerName: "deploy-1", + }, + }, + }, + pods: []*api_v1.Pod{ + { + ObjectMeta: meta_v1.ObjectMeta{ + OwnerReferences: []meta_v1.OwnerReference{ + { + Kind: "Deployment", + Name: "deploy-1", + Controller: boolPointer(true), + }, + }, + }, + Status: api_v1.PodStatus{ + PodIP: "1.2.3.4", + }, + }, + { + ObjectMeta: meta_v1.ObjectMeta{ + OwnerReferences: []meta_v1.OwnerReference{ + { + Kind: "Deployment", + Name: "deploy-1", + Controller: boolPointer(true), + }, + }, + }, + Status: api_v1.PodStatus{ + PodIP: "5.6.7.8", + }, + }, + }, + svcEndpointSlices: []discovery_v1.EndpointSlice{ + { + Ports: []discovery_v1.EndpointPort{ + { + Port: &endpointPort, + }, + }, + Endpoints: []discovery_v1.Endpoint{ + { + Addresses: []string{ + "1.2.3.4", + }, + }, + { + Addresses: []string{ + "5.6.7.8", + }, + }, + }, + }, + }, }, } - pods := []*api_v1.Pod{ + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + gotEndpoints := getEndpointsFromEndpointSlicesForSubselectedPods(test.targetPort, test.pods, test.svcEndpointSlices) + + if result := unorderedEqual(gotEndpoints, test.expectedEndpoints); !result { + t.Errorf("getEndpointsFromEndpointSlicesForSubselectedPods() = got %v, want %v", gotEndpoints, test.expectedEndpoints) + } + }) + } +} + +func TestGetEndpointSlicesBySubselectedPods_FindTwoPodsInTwoEndpointSlices(t *testing.T) { + endpointPort := int32(8080) + + boolPointer := func(b bool) *bool { return &b } + tests := []struct { + desc string + targetPort int32 + svcEndpointSlices []discovery_v1.EndpointSlice + pods []*api_v1.Pod + expectedEndpoints []podEndpoint + }{ { - ObjectMeta: meta_v1.ObjectMeta{ - OwnerReferences: []meta_v1.OwnerReference{ - { - Kind: "Deployment", - Name: "deploy-1", - Controller: boolPointer(true), + desc: "find two pods in two endpointslices", + targetPort: 8080, + expectedEndpoints: []podEndpoint{ + { + Address: "1.2.3.4:8080", + MeshPodOwner: configs.MeshPodOwner{ + OwnerType: "deployment", + OwnerName: "deploy-1", + }, + }, + { + Address: "5.6.7.8:8080", + MeshPodOwner: configs.MeshPodOwner{ + OwnerType: "deployment", + OwnerName: "deploy-1", }, }, }, - Status: api_v1.PodStatus{ - PodIP: "1.2.3.4", + pods: []*api_v1.Pod{ + { + ObjectMeta: meta_v1.ObjectMeta{ + OwnerReferences: []meta_v1.OwnerReference{ + { + Kind: "Deployment", + Name: "deploy-1", + Controller: boolPointer(true), + }, + }, + }, + Status: api_v1.PodStatus{ + PodIP: "1.2.3.4", + }, + }, + { + ObjectMeta: meta_v1.ObjectMeta{ + OwnerReferences: []meta_v1.OwnerReference{ + { + Kind: "Deployment", + Name: "deploy-1", + Controller: boolPointer(true), + }, + }, + }, + Status: api_v1.PodStatus{ + PodIP: "5.6.7.8", + }, + }, + }, + svcEndpointSlices: []discovery_v1.EndpointSlice{ + { + Ports: []discovery_v1.EndpointPort{ + { + Port: &endpointPort, + }, + }, + Endpoints: []discovery_v1.Endpoint{ + { + Addresses: []string{ + "1.2.3.4", + }, + }, + }, + }, + { + Ports: []discovery_v1.EndpointPort{ + { + Port: &endpointPort, + }, + }, + Endpoints: []discovery_v1.Endpoint{ + { + Addresses: []string{ + "5.6.7.8", + }, + }, + }, + }, }, }, } - svcEps := api_v1.Endpoints{ - Subsets: []api_v1.EndpointSubset{ - { - Addresses: []api_v1.EndpointAddress{ - { - IP: "1.2.3.4", - Hostname: "asdf.com", + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + gotEndpoints := getEndpointsFromEndpointSlicesForSubselectedPods(test.targetPort, test.pods, test.svcEndpointSlices) + + if result := unorderedEqual(gotEndpoints, test.expectedEndpoints); !result { + t.Errorf("getEndpointsFromEndpointSlicesForSubselectedPods() = got %v, want %v", gotEndpoints, test.expectedEndpoints) + } + }) + } +} + +func TestGetEndpointSlicesBySubselectedPods_FindNoPods(t *testing.T) { + endpointPort := int32(8080) + + boolPointer := func(b bool) *bool { return &b } + tests := []struct { + desc string + targetPort int32 + svcEndpointSlices []discovery_v1.EndpointSlice + pods []*api_v1.Pod + expectedEndpoints []podEndpoint + }{ + { + desc: "find no pods", + targetPort: 8080, + expectedEndpoints: nil, + pods: []*api_v1.Pod{ + { + ObjectMeta: meta_v1.ObjectMeta{ + OwnerReferences: []meta_v1.OwnerReference{ + { + Kind: "Deployment", + Name: "deploy-1", + Controller: boolPointer(true), + }, + }, + }, + Status: api_v1.PodStatus{ + PodIP: "1.2.3.4", }, }, - Ports: []api_v1.EndpointPort{ - { - Port: 80, + }, + svcEndpointSlices: []discovery_v1.EndpointSlice{ + { + Ports: []discovery_v1.EndpointPort{ + { + Port: &endpointPort, + }, + }, + Endpoints: []discovery_v1.Endpoint{ + { + Addresses: []string{ + "5.4.3.2", + }, + }, }, }, }, @@ -535,14 +1263,92 @@ func TestGetEndpointsBySubselectedPods(t *testing.T) { for _, test := range tests { t.Run(test.desc, func(t *testing.T) { - gotEndps := getEndpointsBySubselectedPods(test.targetPort, pods, svcEps) - if !reflect.DeepEqual(gotEndps, test.expectedEps) { - t.Errorf("getEndpointsBySubselectedPods() = %v, want %v", gotEndps, test.expectedEps) + gotEndpoints := getEndpointsFromEndpointSlicesForSubselectedPods(test.targetPort, test.pods, test.svcEndpointSlices) + + if result := unorderedEqual(gotEndpoints, test.expectedEndpoints); !result { + t.Errorf("getEndpointsFromEndpointSlicesForSubselectedPods() = got %v, want %v", gotEndpoints, test.expectedEndpoints) } }) } } +func TestGetEndpointSlicesBySubselectedPods_TargetPortMismatch(t *testing.T) { + endpointPort := int32(8080) + + boolPointer := func(b bool) *bool { return &b } + tests := []struct { + desc string + targetPort int32 + svcEndpointSlices []discovery_v1.EndpointSlice + pods []*api_v1.Pod + expectedEndpoints []podEndpoint + }{ + { + desc: "targetPort mismatch", + targetPort: 21, + svcEndpointSlices: []discovery_v1.EndpointSlice{ + { + Ports: []discovery_v1.EndpointPort{ + { + Port: &endpointPort, + }, + }, + Endpoints: []discovery_v1.Endpoint{ + { + Addresses: []string{ + "1.2.3.4", + }, + }, + }, + }, + }, + pods: []*api_v1.Pod{ + { + ObjectMeta: meta_v1.ObjectMeta{ + OwnerReferences: []meta_v1.OwnerReference{ + { + Kind: "Deployment", + Name: "deploy-1", + Controller: boolPointer(true), + }, + }, + }, + Status: api_v1.PodStatus{ + PodIP: "1.2.3.4", + }, + }, + }, + expectedEndpoints: nil, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + gotEndpoints := getEndpointsFromEndpointSlicesForSubselectedPods(test.targetPort, test.pods, test.svcEndpointSlices) + + if result := unorderedEqual(gotEndpoints, test.expectedEndpoints); !result { + t.Errorf("getEndpointsFromEndpointSlicesForSubselectedPods() = got %v, want %v", gotEndpoints, test.expectedEndpoints) + } + }) + } +} + +func unorderedEqual(got, want []podEndpoint) bool { + if len(got) != len(want) { + return false + } + exists := make(map[string]bool) + for _, value := range got { + exists[value.Address] = true + } + for _, value := range want { + if !exists[value.Address] { + return false + } + } + return true +} + func TestGetStatusFromEventTitle(t *testing.T) { tests := []struct { eventTitle string diff --git a/internal/k8s/handlers.go b/internal/k8s/handlers.go index 5cb40e271a..b98b373e07 100644 --- a/internal/k8s/handlers.go +++ b/internal/k8s/handlers.go @@ -5,6 +5,8 @@ import ( "reflect" "sort" + discovery_v1 "k8s.io/api/discovery/v1" + "github.com/nginxinc/kubernetes-ingress/pkg/apis/dos/v1beta1" "github.com/golang/glog" @@ -60,34 +62,33 @@ func createConfigMapHandlers(lbc *LoadBalancerController, name string) cache.Res } } -// createEndpointHandlers builds the handler funcs for endpoints -func createEndpointHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs { +// createEndpointSliceHandlers builds the handler funcs for EndpointSlices +func createEndpointSliceHandlers(lbc *LoadBalancerController) cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - endpoint := obj.(*v1.Endpoints) - glog.V(3).Infof("Adding endpoints: %v", endpoint.Name) + endpointSlice := obj.(*discovery_v1.EndpointSlice) + glog.V(3).Infof("Adding EndpointSlice: %v", endpointSlice.Name) lbc.AddSyncQueue(obj) }, DeleteFunc: func(obj interface{}) { - endpoint, isEndpoint := obj.(*v1.Endpoints) - if !isEndpoint { + endpointSlice, isEndpointSlice := obj.(*discovery_v1.EndpointSlice) + if !isEndpointSlice { deletedState, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.V(3).Infof("Error received unexpected object: %v", obj) return } - endpoint, ok = deletedState.Obj.(*v1.Endpoints) + endpointSlice, ok = deletedState.Obj.(*discovery_v1.EndpointSlice) if !ok { - glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-Endpoints object: %v", deletedState.Obj) + glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-EndpointSlice object: %v", deletedState.Obj) return } } - glog.V(3).Infof("Removing endpoints: %v", endpoint.Name) + glog.V(3).Infof("Removing EndpointSlice: %v", endpointSlice.Name) lbc.AddSyncQueue(obj) - }, - UpdateFunc: func(old, cur interface{}) { + }, UpdateFunc: func(old, cur interface{}) { if !reflect.DeepEqual(old, cur) { - glog.V(3).Infof("Endpoints %v changed, syncing", cur.(*v1.Endpoints).Name) + glog.V(3).Infof("EndpointSlice %v changed, syncing", cur.(*discovery_v1.EndpointSlice).Name) lbc.AddSyncQueue(cur) } }, diff --git a/internal/k8s/task_queue.go b/internal/k8s/task_queue.go index 021b415ed4..7406b4bbfd 100644 --- a/internal/k8s/task_queue.go +++ b/internal/k8s/task_queue.go @@ -12,6 +12,7 @@ import ( conf_v1 "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1" conf_v1alpha1 "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1alpha1" v1 "k8s.io/api/core/v1" + discovery_v1 "k8s.io/api/discovery/v1" networking "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/wait" @@ -109,7 +110,7 @@ type kind int // resources const ( ingress = iota - endpoints + endpointslice configMap secret service @@ -139,8 +140,8 @@ func newTask(key string, obj interface{}) (task, error) { switch t := obj.(type) { case *networking.Ingress: k = ingress - case *v1.Endpoints: - k = endpoints + case *discovery_v1.EndpointSlice: + k = endpointslice case *v1.ConfigMap: k = configMap case *v1.Secret: diff --git a/internal/k8s/utils.go b/internal/k8s/utils.go index e6cc57fecf..3b8618054b 100644 --- a/internal/k8s/utils.go +++ b/internal/k8s/utils.go @@ -21,6 +21,8 @@ import ( "reflect" "strings" + discovery_v1 "k8s.io/api/discovery/v1" + v1 "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" @@ -37,7 +39,7 @@ type storeToIngressLister struct { cache.Store } -// GetByKeySafe calls Store.GetByKeySafe and returns a copy of the ingress so it is +// GetByKeySafe calls Store.GetByKeySafe and returns a copy of the ingress, so it is // safe to modify. func (s *storeToIngressLister) GetByKeySafe(key string) (ing *networking.Ingress, exists bool, err error) { item, exists, err := s.Store.GetByKey(key) @@ -69,7 +71,7 @@ func (s *storeToConfigMapLister) List() (cfgm v1.ConfigMapList, err error) { return cfgm, nil } -// indexerToPodLister makes a Indexer that lists Pods. +// indexerToPodLister makes an Indexer that lists Pods. type indexerToPodLister struct { cache.Indexer } @@ -82,20 +84,23 @@ func (ipl indexerToPodLister) ListByNamespace(ns string, selector labels.Selecto return pods, err } -// storeToEndpointLister makes a Store that lists Endpoints -type storeToEndpointLister struct { +// Store for EndpointSlices +type storeToEndpointSliceLister struct { cache.Store } -// GetServiceEndpoints returns the endpoints of a service, matched on service name. -func (s *storeToEndpointLister) GetServiceEndpoints(svc *v1.Service) (ep v1.Endpoints, err error) { - for _, m := range s.Store.List() { - ep = *m.(*v1.Endpoints) - if svc.Name == ep.Name && svc.Namespace == ep.Namespace { - return ep, nil +// GetServiceEndpointSlices returns the endpoints of a service, matched on service name. +func (s *storeToEndpointSliceLister) GetServiceEndpointSlices(svc *v1.Service) (endpointSlices []discovery_v1.EndpointSlice, err error) { + for _, epStore := range s.Store.List() { + ep := *epStore.(*discovery_v1.EndpointSlice) + if svc.Name == ep.Labels["kubernetes.io/service-name"] && svc.Namespace == ep.Namespace { + endpointSlices = append(endpointSlices, ep) } } - return ep, fmt.Errorf("could not find endpoints for service: %v", svc.Name) + if len(endpointSlices) > 0 { + return endpointSlices, nil + } + return endpointSlices, fmt.Errorf("could not find endpointslices for service: %v", svc.Name) } // findPort locates the container port for the given pod and portName. If the