From 82695f18a3bd734303c4efdc3e737ee8b7b473d7 Mon Sep 17 00:00:00 2001 From: tombokombo Date: Sun, 31 Jul 2022 11:55:39 +0200 Subject: [PATCH 1/6] endpointslices Signed-off-by: tombokombo --- .../ingress-nginx/templates/clusterrole.yaml | 8 + internal/ingress/controller/controller.go | 13 +- .../ingress/controller/controller_test.go | 5 + .../{endpoints.go => endpointslices.go} | 92 ++-- ...dpoints_test.go => endpointslices_test.go} | 392 ++++++++++++------ .../ingress/controller/store/endpointslice.go | 52 +++ internal/ingress/controller/store/store.go | 54 ++- 7 files changed, 440 insertions(+), 176 deletions(-) rename internal/ingress/controller/{endpoints.go => endpointslices.go} (55%) rename internal/ingress/controller/{endpoints_test.go => endpointslices_test.go} (50%) create mode 100644 internal/ingress/controller/store/endpointslice.go diff --git a/charts/ingress-nginx/templates/clusterrole.yaml b/charts/ingress-nginx/templates/clusterrole.yaml index 0e725ec06c..51bc5002cc 100644 --- a/charts/ingress-nginx/templates/clusterrole.yaml +++ b/charts/ingress-nginx/templates/clusterrole.yaml @@ -89,6 +89,14 @@ rules: - get - list - watch + - apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - list + - watch + - get {{- end }} {{- end }} diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index d0f85bb011..562eb68e62 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -433,7 +433,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr sp := svc.Spec.Ports[i] if sp.Name == svcPort { if sp.Protocol == proto { - endps = getEndpoints(svc, &sp, proto, n.store.GetServiceEndpoints) + endps = getEndpointsFromSlices(svc, &sp, proto, n.store.GetServiceEndpointsSlices) break } } @@ -444,7 +444,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr sp := svc.Spec.Ports[i] if sp.Port == int32(targetPort) { if sp.Protocol == proto { - endps = getEndpoints(svc, &sp, proto, n.store.GetServiceEndpoints) + endps = getEndpointsFromSlices(svc, &sp, proto, n.store.GetServiceEndpointsSlices) break } } @@ -496,7 +496,7 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend { return upstream } - endps := getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, n.store.GetServiceEndpoints) + endps := getEndpointsFromSlices(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices) if len(endps) == 0 { klog.Warningf("Service %q does not have any active Endpoint", svcKey) endps = []ingress.Endpoint{n.DefaultEndpoint()} @@ -823,7 +823,7 @@ func (n *NGINXController) getBackendServers(ingresses []*ingress.Ingress) ([]*in } sp := location.DefaultBackend.Spec.Ports[0] - endps := getEndpoints(location.DefaultBackend, &sp, apiv1.ProtocolTCP, n.store.GetServiceEndpoints) + endps := getEndpointsFromSlices(location.DefaultBackend, &sp, apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices) // custom backend is valid only if contains at least one endpoint if len(endps) > 0 { name := fmt.Sprintf("custom-default-backend-%v-%v", location.DefaultBackend.GetNamespace(), location.DefaultBackend.GetName()) @@ -1081,7 +1081,6 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres } klog.V(3).Infof("Obtaining ports information for Service %q", svcKey) - // Ingress with an ExternalName Service and no port defined for that Service if svc.Spec.Type == apiv1.ServiceTypeExternalName { if n.cfg.DisableServiceExternalName { @@ -1089,7 +1088,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres return upstreams, nil } servicePort := externalNamePorts(backendPort, svc) - endps := getEndpoints(svc, servicePort, apiv1.ProtocolTCP, n.store.GetServiceEndpoints) + endps := getEndpointsFromSlices(svc, servicePort, apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices) if len(endps) == 0 { klog.Warningf("Service %q does not have any active Endpoint.", svcKey) return upstreams, nil @@ -1106,7 +1105,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres servicePort.TargetPort.String() == backendPort || servicePort.Name == backendPort { - endps := getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, n.store.GetServiceEndpoints) + endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices) if len(endps) == 0 { klog.Warningf("Service %q does not have any active Endpoint.", svcKey) } diff --git a/internal/ingress/controller/controller_test.go b/internal/ingress/controller/controller_test.go index f875632e09..0754b29eca 100644 --- a/internal/ingress/controller/controller_test.go +++ b/internal/ingress/controller/controller_test.go @@ -34,6 +34,7 @@ import ( "github.com/eapache/channels" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" networking "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -89,6 +90,10 @@ func (fakeIngressStore) GetServiceEndpoints(key string) (*corev1.Endpoints, erro return nil, fmt.Errorf("test error") } +func (fakeIngressStore) GetServiceEndpointsSlices(key string) ([]*discoveryv1.EndpointSlice, error) { + return nil, fmt.Errorf("test error") +} + func (fis fakeIngressStore) ListIngresses() []*ingress.Ingress { return fis.ingresses } diff --git a/internal/ingress/controller/endpoints.go b/internal/ingress/controller/endpointslices.go similarity index 55% rename from internal/ingress/controller/endpoints.go rename to internal/ingress/controller/endpointslices.go index 5615fadea1..efd2a6afa1 100644 --- a/internal/ingress/controller/endpoints.go +++ b/internal/ingress/controller/endpointslices.go @@ -28,14 +28,15 @@ import ( "k8s.io/klog/v2" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/ingress-nginx/internal/k8s" "k8s.io/ingress-nginx/pkg/apis/ingress" ) // getEndpoints returns a list of Endpoint structs for a given service/target port combination. -func getEndpoints(s *corev1.Service, port *corev1.ServicePort, proto corev1.Protocol, - getServiceEndpoints func(string) (*corev1.Endpoints, error)) []ingress.Endpoint { +func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto corev1.Protocol, + getServiceEndpointsSlices func(string) ([]*discoveryv1.EndpointSlice, error)) []ingress.Endpoint { upsServers := []ingress.Endpoint{} @@ -74,54 +75,63 @@ func getEndpoints(s *corev1.Service, port *corev1.ServicePort, proto corev1.Prot }) } - klog.V(3).Infof("Getting Endpoints for Service %q and port %v", svcKey, port.String()) - ep, err := getServiceEndpoints(svcKey) + klog.V(3).Infof("Getting Endpoints from endpointSlices for Service %q and port %v", svcKey, port.String()) + epss, err := getServiceEndpointsSlices(svcKey) if err != nil { klog.Warningf("Error obtaining Endpoints for Service %q: %v", svcKey, err) return upsServers } - - for _, ss := range ep.Subsets { - matchedPortNameFound := false - for i, epPort := range ss.Ports { - - if !reflect.DeepEqual(epPort.Protocol, proto) { - continue - } - - var targetPort int32 - - if port.Name == "" { - // port.Name is optional if there is only one port - targetPort = epPort.Port - matchedPortNameFound = true - } else if port.Name == epPort.Name { - targetPort = epPort.Port - matchedPortNameFound = true - } - - if i == len(ss.Ports)-1 && !matchedPortNameFound && port.TargetPort.Type == intstr.Int { - // use service target port if it's a number and no port name matched - // https://github.com/kubernetes/ingress-nginx/issues/7390 - targetPort = port.TargetPort.IntVal + // loop over all endpointSlices generated for service + for _, eps := range epss { + var ports []int32 + if len(eps.Ports) == 0 { + // When ports is empty, it indicates that there are no defined ports, using svc targePort <- this could be wrong + klog.V(3).Infof("No ports found on endpointSlice, using service TargetPort %v for Service %q", port.String(), svcKey) + ports = append(ports, port.TargetPort.IntVal) + } else { + for _, epPort := range eps.Ports { + if !reflect.DeepEqual(*epPort.Protocol, proto) { + continue + } + var targetPort int32 = 0 + if port.Name == "" { + // port.Name is optional if there is only one port + targetPort = *epPort.Port + } else if port.Name == *epPort.Name { + targetPort = *epPort.Port + } + if targetPort == 0 && port.TargetPort.Type == intstr.Int { + // use service target port if it's a number and no port name matched + // https://github.com/kubernetes/ingress-nginx/issues/7390 + targetPort = port.TargetPort.IntVal + } + if targetPort == 0 { + continue + } + ports = append(ports, targetPort) } - - if targetPort <= 0 { + } + for _, ep := range eps.Endpoints { + if !(*ep.Conditions.Ready) { continue } - for _, epAddress := range ss.Addresses { - ep := net.JoinHostPort(epAddress.IP, strconv.Itoa(int(targetPort))) - if _, exists := processedUpstreamServers[ep]; exists { - continue - } - ups := ingress.Endpoint{ - Address: epAddress.IP, - Port: fmt.Sprintf("%v", targetPort), - Target: epAddress.TargetRef, + // ep.Hints + + for _, epPort := range ports { + for _, epAddress := range ep.Addresses { + hostPort := net.JoinHostPort(epAddress, strconv.Itoa(int(epPort))) + if _, exists := processedUpstreamServers[hostPort]; exists { + continue + } + ups := ingress.Endpoint{ + Address: epAddress, + Port: fmt.Sprintf("%v", epPort), + Target: ep.TargetRef, + } + upsServers = append(upsServers, ups) + processedUpstreamServers[hostPort] = struct{}{} } - upsServers = append(upsServers, ups) - processedUpstreamServers[ep] = struct{}{} } } } diff --git a/internal/ingress/controller/endpoints_test.go b/internal/ingress/controller/endpointslices_test.go similarity index 50% rename from internal/ingress/controller/endpoints_test.go rename to internal/ingress/controller/endpointslices_test.go index f38ffabe78..e404c4949f 100644 --- a/internal/ingress/controller/endpoints_test.go +++ b/internal/ingress/controller/endpointslices_test.go @@ -21,17 +21,19 @@ import ( "testing" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/ingress-nginx/pkg/apis/ingress" ) -func TestGetEndpoints(t *testing.T) { +func TestGetEndpointsFromSlices(t *testing.T) { tests := []struct { name string svc *corev1.Service port *corev1.ServicePort proto corev1.Protocol - fn func(string) (*corev1.Endpoints, error) + fn func(string) ([]*discoveryv1.EndpointSlice, error) result []ingress.Endpoint }{ { @@ -39,7 +41,7 @@ func TestGetEndpoints(t *testing.T) { nil, nil, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { + func(string) ([]*discoveryv1.EndpointSlice, error) { return nil, nil }, []ingress.Endpoint{}, @@ -49,7 +51,7 @@ func TestGetEndpoints(t *testing.T) { &corev1.Service{}, nil, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { + func(string) ([]*discoveryv1.EndpointSlice, error) { return nil, nil }, []ingress.Endpoint{}, @@ -59,8 +61,8 @@ func TestGetEndpoints(t *testing.T) { &corev1.Service{}, &corev1.ServicePort{Name: "default"}, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { - return &corev1.Endpoints{}, nil + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{}, nil }, []ingress.Endpoint{}, }, @@ -73,8 +75,8 @@ func TestGetEndpoints(t *testing.T) { }, &corev1.ServicePort{Name: "default"}, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { - return &corev1.Endpoints{}, nil + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{}, nil }, []ingress.Endpoint{}, }, @@ -97,8 +99,8 @@ func TestGetEndpoints(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { - return &corev1.Endpoints{}, nil + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{}, nil }, []ingress.Endpoint{}, }, @@ -121,8 +123,8 @@ func TestGetEndpoints(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { - return &corev1.Endpoints{}, nil + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{}, nil }, []ingress.Endpoint{}, }, @@ -145,8 +147,8 @@ func TestGetEndpoints(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { - return &corev1.Endpoints{}, nil + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{}, nil }, []ingress.Endpoint{ { @@ -174,8 +176,8 @@ func TestGetEndpoints(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { - return &corev1.Endpoints{}, nil + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{}, nil }, []ingress.Endpoint{ { @@ -203,8 +205,8 @@ func TestGetEndpoints(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { - return &corev1.Endpoints{}, nil + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{}, nil }, []ingress.Endpoint{}, }, @@ -227,7 +229,7 @@ func TestGetEndpoints(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { + func(string) ([]*discoveryv1.EndpointSlice, error) { return nil, fmt.Errorf("unexpected error") }, []ingress.Endpoint{}, @@ -251,25 +253,27 @@ func TestGetEndpoints(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { - nodeName := "dummy" - return &corev1.Endpoints{ - Subsets: []corev1.EndpointSubset{ + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []corev1.EndpointAddress{ - { - IP: "1.1.1.1", - NodeName: &nodeName, - }, - }, - Ports: []corev1.EndpointPort{ - { - Protocol: corev1.ProtocolUDP, - }, + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], }, }, }, - }, nil + Ports: []discoveryv1.EndpointPort{ + { + Name: &[]string{""}[0], + Port: &[]int32{80}[0], + Protocol: &[]corev1.Protocol{corev1.ProtocolUDP}[0], + }, + }, + }}, nil }, []ingress.Endpoint{}, }, @@ -292,30 +296,32 @@ func TestGetEndpoints(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { - nodeName := "dummy" - return &corev1.Endpoints{ - Subsets: []corev1.EndpointSubset{ + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ { - NotReadyAddresses: []corev1.EndpointAddress{ - { - IP: "1.1.1.1", - NodeName: &nodeName, - }, - }, - Ports: []corev1.EndpointPort{ - { - Protocol: corev1.ProtocolUDP, - }, + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{false}[0], }, }, }, - }, nil + Ports: []discoveryv1.EndpointPort{ + { + Name: &[]string{""}[0], + Port: &[]int32{80}[0], + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + }, + }, + }}, nil }, []ingress.Endpoint{}, }, { - "should return no endpoint when the name of the port name do not match any port in the endpoint Subsets and TargetPort is string", + "should return no endpoint when the name of the port name do not match any port in the endpointPort and TargetPort is string", &corev1.Service{ Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, @@ -333,32 +339,32 @@ func TestGetEndpoints(t *testing.T) { TargetPort: intstr.FromString("port-1"), }, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { - nodeName := "dummy" - return &corev1.Endpoints{ - Subsets: []corev1.EndpointSubset{ + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []corev1.EndpointAddress{ - { - IP: "1.1.1.1", - NodeName: &nodeName, - }, - }, - Ports: []corev1.EndpointPort{ - { - Protocol: corev1.ProtocolTCP, - Port: int32(80), - Name: "another-name", - }, + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], }, }, }, - }, nil + Ports: []discoveryv1.EndpointPort{ + { + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + Port: &[]int32{80}[0], + Name: &[]string{"another-name"}[0], + }, + }, + }}, nil }, []ingress.Endpoint{}, }, { - "should return one endpoint when the name of the port name do not match any port in the endpoint Subsets and TargetPort is int", + "should return one endpoint when the name of the port name do not match any port in the endpointPort and TargetPort is int", &corev1.Service{ Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, @@ -376,27 +382,27 @@ func TestGetEndpoints(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { - nodeName := "dummy" - return &corev1.Endpoints{ - Subsets: []corev1.EndpointSubset{ + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []corev1.EndpointAddress{ - { - IP: "1.1.1.1", - NodeName: &nodeName, - }, - }, - Ports: []corev1.EndpointPort{ - { - Protocol: corev1.ProtocolTCP, - Port: int32(80), - Name: "another-name", - }, + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], }, }, }, - }, nil + Ports: []discoveryv1.EndpointPort{ + { + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + Port: &[]int32{80}[0], + Name: &[]string{"another-name"}[0], + }, + }, + }}, nil }, []ingress.Endpoint{ { @@ -406,7 +412,7 @@ func TestGetEndpoints(t *testing.T) { }, }, { - "should return one endpoint when the name of the port name match a port in the endpoint Subsets", + "should return one endpoint when the name of the port name match a port in the endpointPort", &corev1.Service{ Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, @@ -424,25 +430,95 @@ func TestGetEndpoints(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { - nodeName := "dummy" - return &corev1.Endpoints{ - Subsets: []corev1.EndpointSubset{ + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + }, + }, + Ports: []discoveryv1.EndpointPort{ { - Addresses: []corev1.EndpointAddress{ - { - IP: "1.1.1.1", - NodeName: &nodeName, + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + Port: &[]int32{80}[0], + Name: &[]string{"default"}[0], + }, + }, + }}, nil + }, + []ingress.Endpoint{ + { + Address: "1.1.1.1", + Port: "80", + }, + }, + }, + { + "should return two endpoints when the name of the port name match a port in the endpointPort", + &corev1.Service{ + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "1.1.1.1", + Ports: []corev1.ServicePort{ + { + Name: "default", + TargetPort: intstr.FromInt(80), + }, + }, + }, + }, + &corev1.ServicePort{ + Name: "default", + TargetPort: intstr.FromString("port-1"), + }, + corev1.ProtocolTCP, + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], }, }, - Ports: []corev1.EndpointPort{ - { - Protocol: corev1.ProtocolTCP, - Port: int32(80), - Name: "default", + }, + Ports: []discoveryv1.EndpointPort{ + { + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + Port: &[]int32{80}[0], + Name: &[]string{"default"}[0], + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"2.2.2.2"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], }, }, }, + Ports: []discoveryv1.EndpointPort{ + { + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + Port: &[]int32{80}[0], + Name: &[]string{"default"}[0], + }, + }, }, }, nil }, @@ -451,10 +527,14 @@ func TestGetEndpoints(t *testing.T) { Address: "1.1.1.1", Port: "80", }, + { + Address: "2.2.2.2", + Port: "80", + }, }, }, { - "should return one endpoint when the name of the port name match more than one port in the endpoint Subsets", + "should return one endpoints when the name of the port name match a port in the endpointPort", &corev1.Service{ Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, @@ -462,7 +542,7 @@ func TestGetEndpoints(t *testing.T) { Ports: []corev1.ServicePort{ { Name: "default", - TargetPort: intstr.FromString("port-1"), + TargetPort: intstr.FromInt(80), }, }, }, @@ -472,30 +552,47 @@ func TestGetEndpoints(t *testing.T) { TargetPort: intstr.FromString("port-1"), }, corev1.ProtocolTCP, - func(string) (*corev1.Endpoints, error) { - nodeName := "dummy" - return &corev1.Endpoints{ - Subsets: []corev1.EndpointSubset{ - { - Addresses: []corev1.EndpointAddress{ - { - IP: "1.1.1.1", - NodeName: &nodeName, + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], }, }, - Ports: []corev1.EndpointPort{ - { - Name: "port-1", - Protocol: corev1.ProtocolTCP, - Port: 80, - }, - { - Name: "port-1", - Protocol: corev1.ProtocolTCP, - Port: 80, + }, + Ports: []discoveryv1.EndpointPort{ + { + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + Port: &[]int32{80}[0], + Name: &[]string{"port-1"}[0], + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"2.2.2.2"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], }, }, }, + Ports: []discoveryv1.EndpointPort{ + { + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + Port: &[]int32{80}[0], + Name: &[]string{"another-name"}[0], + }, + }, }, }, nil }, @@ -506,11 +603,64 @@ func TestGetEndpoints(t *testing.T) { }, }, }, + { + "should return one endpoint when the name of the port name match more than one port in the endpointPort", + &corev1.Service{ + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "1.1.1.1", + Ports: []corev1.ServicePort{ + { + Name: "default", + TargetPort: intstr.FromString("port-1"), + }, + }, + }, + }, + &corev1.ServicePort{ + Name: "port-1", + TargetPort: intstr.FromString("port-1"), + }, + corev1.ProtocolTCP, + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + Port: &[]int32{80}[0], + Name: &[]string{"port-1"}[0], + }, + { + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + Port: &[]int32{80}[0], + Name: &[]string{"port-2"}[0], + }, + }, + }}, nil + }, + []ingress.Endpoint{ + { + Address: "1.1.1.1", + Port: "80", + }, + }, + }, } for _, testCase := range tests { t.Run(testCase.name, func(t *testing.T) { - result := getEndpoints(testCase.svc, testCase.port, testCase.proto, testCase.fn) + result := getEndpointsFromSlices(testCase.svc, testCase.port, testCase.proto, testCase.fn) if len(testCase.result) != len(result) { t.Errorf("Expected %d Endpoints but got %d", len(testCase.result), len(result)) } diff --git a/internal/ingress/controller/store/endpointslice.go b/internal/ingress/controller/store/endpointslice.go new file mode 100644 index 0000000000..de3fce9875 --- /dev/null +++ b/internal/ingress/controller/store/endpointslice.go @@ -0,0 +1,52 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + "strings" + + discoveryv1 "k8s.io/api/discovery/v1" + "k8s.io/client-go/tools/cache" +) + +// EndpointLister makes a Store that lists Endpoints. +type EndpointSliceLister struct { + cache.Store +} + +// ByKey returns the EndpointsSlices of the Service matching key in the local Endpoint Store. +func (s *EndpointSliceLister) MatchByKey(key string) ([]*discoveryv1.EndpointSlice, error) { + var eps []*discoveryv1.EndpointSlice + // filter endpointSlices owned by svc + for _, listKey := range s.ListKeys() { + if strings.HasPrefix(listKey, key) { + epss, exists, err := s.GetByKey(listKey) + if exists && err == nil { + svcName := epss.(*discoveryv1.EndpointSlice).ObjectMeta.GetLabels()[discoveryv1.LabelServiceName] + namespace := epss.(*discoveryv1.EndpointSlice).ObjectMeta.GetNamespace() + if key == fmt.Sprintf("%s/%s", namespace, svcName) { + eps = append(eps, epss.(*discoveryv1.EndpointSlice)) + } + } + } + } + if len(eps) == 0 { + return nil, NotExistsError(key) + } + return eps, nil +} diff --git a/internal/ingress/controller/store/store.go b/internal/ingress/controller/store/store.go index 239fbaaf5a..5bb9d097f4 100644 --- a/internal/ingress/controller/store/store.go +++ b/internal/ingress/controller/store/store.go @@ -29,6 +29,7 @@ import ( "github.com/eapache/channels" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -81,6 +82,9 @@ type Storer interface { // GetServiceEndpoints returns the Endpoints of a Service matching key. GetServiceEndpoints(key string) (*corev1.Endpoints, error) + // GetServiceEndpointsSlices returns the EndpointSlices of a Service matching key. + GetServiceEndpointsSlices(key string) ([]*discoveryv1.EndpointSlice, error) + // ListIngresses returns a list of all Ingresses in the store. ListIngresses() []*ingress.Ingress @@ -127,13 +131,14 @@ type Event struct { // Informer defines the required SharedIndexInformers that interact with the API server. type Informer struct { - Ingress cache.SharedIndexInformer - IngressClass cache.SharedIndexInformer - Endpoint cache.SharedIndexInformer - Service cache.SharedIndexInformer - Secret cache.SharedIndexInformer - ConfigMap cache.SharedIndexInformer - Namespace cache.SharedIndexInformer + Ingress cache.SharedIndexInformer + IngressClass cache.SharedIndexInformer + Endpoint cache.SharedIndexInformer + EndpointSlice cache.SharedIndexInformer + Service cache.SharedIndexInformer + Secret cache.SharedIndexInformer + ConfigMap cache.SharedIndexInformer + Namespace cache.SharedIndexInformer } // Lister contains object listers (stores). @@ -142,6 +147,7 @@ type Lister struct { IngressClass IngressClassLister Service ServiceLister Endpoint EndpointLister + EndpointSlice EndpointSliceLister Secret SecretLister ConfigMap ConfigMapLister Namespace NamespaceLister @@ -160,6 +166,7 @@ func (e NotExistsError) Error() string { func (i *Informer) Run(stopCh chan struct{}) { go i.Secret.Run(stopCh) go i.Endpoint.Run(stopCh) + go i.EndpointSlice.Run(stopCh) if i.IngressClass != nil { go i.IngressClass.Run(stopCh) } @@ -333,6 +340,9 @@ func New( store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer() store.listers.Endpoint.Store = store.informers.Endpoint.GetStore() + store.informers.EndpointSlice = infFactory.Discovery().V1().EndpointSlices().Informer() + store.listers.EndpointSlice.Store = store.informers.EndpointSlice.GetStore() + store.informers.Secret = infFactorySecrets.Core().V1().Secrets().Informer() store.listers.Secret.Store = store.informers.Secret.GetStore() @@ -698,6 +708,31 @@ func New( }, } + epsEventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + updateCh.In() <- Event{ + Type: CreateEvent, + Obj: obj, + } + }, + DeleteFunc: func(obj interface{}) { + updateCh.In() <- Event{ + Type: DeleteEvent, + Obj: obj, + } + }, + UpdateFunc: func(old, cur interface{}) { + oeps := old.(*discoveryv1.EndpointSlice) + ceps := cur.(*discoveryv1.EndpointSlice) + if !reflect.DeepEqual(ceps.Endpoints, oeps.Endpoints) { + updateCh.In() <- Event{ + Type: UpdateEvent, + Obj: cur, + } + } + }, + } + // TODO: add e2e test to verify that changes to one or more configmap trigger an update changeTriggerUpdate := func(name string) bool { return name == configmap || name == tcp || name == udp @@ -797,6 +832,7 @@ func New( store.informers.IngressClass.AddEventHandler(ingressClassEventHandler) } store.informers.Endpoint.AddEventHandler(epEventHandler) + store.informers.EndpointSlice.AddEventHandler(epsEventHandler) store.informers.Secret.AddEventHandler(secrEventHandler) store.informers.ConfigMap.AddEventHandler(cmEventHandler) store.informers.Service.AddEventHandler(serviceHandler) @@ -1049,6 +1085,10 @@ func (s *k8sStore) GetServiceEndpoints(key string) (*corev1.Endpoints, error) { return s.listers.Endpoint.ByKey(key) } +func (s *k8sStore) GetServiceEndpointsSlices(key string) ([]*discoveryv1.EndpointSlice, error) { + return s.listers.EndpointSlice.MatchByKey(key) +} + // GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret func (s *k8sStore) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) { if _, err := s.GetLocalSSLCert(name); err != nil { From 1c13825bbe248219c7b24540ce8d13fbc37f9591 Mon Sep 17 00:00:00 2001 From: tombokombo Date: Mon, 1 Aug 2022 01:08:40 +0200 Subject: [PATCH 2/6] cleanup Signed-off-by: tombokombo --- .../ingress/controller/controller_test.go | 4 -- internal/ingress/controller/store/store.go | 41 ------------------- 2 files changed, 45 deletions(-) diff --git a/internal/ingress/controller/controller_test.go b/internal/ingress/controller/controller_test.go index 0754b29eca..da9f10e454 100644 --- a/internal/ingress/controller/controller_test.go +++ b/internal/ingress/controller/controller_test.go @@ -86,10 +86,6 @@ func (fakeIngressStore) GetService(key string) (*corev1.Service, error) { return nil, fmt.Errorf("test error") } -func (fakeIngressStore) GetServiceEndpoints(key string) (*corev1.Endpoints, error) { - return nil, fmt.Errorf("test error") -} - func (fakeIngressStore) GetServiceEndpointsSlices(key string) ([]*discoveryv1.EndpointSlice, error) { return nil, fmt.Errorf("test error") } diff --git a/internal/ingress/controller/store/store.go b/internal/ingress/controller/store/store.go index 5bb9d097f4..7913eb0de3 100644 --- a/internal/ingress/controller/store/store.go +++ b/internal/ingress/controller/store/store.go @@ -79,9 +79,6 @@ type Storer interface { // GetService returns the Service matching key. GetService(key string) (*corev1.Service, error) - // GetServiceEndpoints returns the Endpoints of a Service matching key. - GetServiceEndpoints(key string) (*corev1.Endpoints, error) - // GetServiceEndpointsSlices returns the EndpointSlices of a Service matching key. GetServiceEndpointsSlices(key string) ([]*discoveryv1.EndpointSlice, error) @@ -133,7 +130,6 @@ type Event struct { type Informer struct { Ingress cache.SharedIndexInformer IngressClass cache.SharedIndexInformer - Endpoint cache.SharedIndexInformer EndpointSlice cache.SharedIndexInformer Service cache.SharedIndexInformer Secret cache.SharedIndexInformer @@ -146,7 +142,6 @@ type Lister struct { Ingress IngressLister IngressClass IngressClassLister Service ServiceLister - Endpoint EndpointLister EndpointSlice EndpointSliceLister Secret SecretLister ConfigMap ConfigMapLister @@ -165,7 +160,6 @@ func (e NotExistsError) Error() string { // Run initiates the synchronization of the informers against the API server. func (i *Informer) Run(stopCh chan struct{}) { go i.Secret.Run(stopCh) - go i.Endpoint.Run(stopCh) go i.EndpointSlice.Run(stopCh) if i.IngressClass != nil { go i.IngressClass.Run(stopCh) @@ -176,7 +170,6 @@ func (i *Informer) Run(stopCh chan struct{}) { // wait for all involved caches to be synced before processing items // from the queue if !cache.WaitForCacheSync(stopCh, - i.Endpoint.HasSynced, i.Service.HasSynced, i.Secret.HasSynced, i.ConfigMap.HasSynced, @@ -337,9 +330,6 @@ func New( store.listers.IngressClass.Store = cache.NewStore(cache.MetaNamespaceKeyFunc) } - store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer() - store.listers.Endpoint.Store = store.informers.Endpoint.GetStore() - store.informers.EndpointSlice = infFactory.Discovery().V1().EndpointSlices().Informer() store.listers.EndpointSlice.Store = store.informers.EndpointSlice.GetStore() @@ -683,31 +673,6 @@ func New( }, } - epEventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - updateCh.In() <- Event{ - Type: CreateEvent, - Obj: obj, - } - }, - DeleteFunc: func(obj interface{}) { - updateCh.In() <- Event{ - Type: DeleteEvent, - Obj: obj, - } - }, - UpdateFunc: func(old, cur interface{}) { - oep := old.(*corev1.Endpoints) - cep := cur.(*corev1.Endpoints) - if !reflect.DeepEqual(cep.Subsets, oep.Subsets) { - updateCh.In() <- Event{ - Type: UpdateEvent, - Obj: cur, - } - } - }, - } - epsEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { updateCh.In() <- Event{ @@ -831,7 +796,6 @@ func New( if !icConfig.IgnoreIngressClass { store.informers.IngressClass.AddEventHandler(ingressClassEventHandler) } - store.informers.Endpoint.AddEventHandler(epEventHandler) store.informers.EndpointSlice.AddEventHandler(epsEventHandler) store.informers.Secret.AddEventHandler(secrEventHandler) store.informers.ConfigMap.AddEventHandler(cmEventHandler) @@ -1080,11 +1044,6 @@ func (s *k8sStore) GetConfigMap(key string) (*corev1.ConfigMap, error) { return s.listers.ConfigMap.ByKey(key) } -// GetServiceEndpoints returns the Endpoints of a Service matching key. -func (s *k8sStore) GetServiceEndpoints(key string) (*corev1.Endpoints, error) { - return s.listers.Endpoint.ByKey(key) -} - func (s *k8sStore) GetServiceEndpointsSlices(key string) ([]*discoveryv1.EndpointSlice, error) { return s.listers.EndpointSlice.MatchByKey(key) } From ad9342b13e1ead7b24bcfb8347a62f87c324f774 Mon Sep 17 00:00:00 2001 From: tombokombo Date: Mon, 1 Aug 2022 02:06:11 +0200 Subject: [PATCH 3/6] fix rbac Signed-off-by: tombokombo --- charts/ingress-nginx/templates/controller-role.yaml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/charts/ingress-nginx/templates/controller-role.yaml b/charts/ingress-nginx/templates/controller-role.yaml index 8e5f8a0d7b..008f59c207 100644 --- a/charts/ingress-nginx/templates/controller-role.yaml +++ b/charts/ingress-nginx/templates/controller-role.yaml @@ -95,6 +95,14 @@ rules: verbs: - create - patch + - apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - list + - watch + - get {{- if .Values.podSecurityPolicy.enabled }} - apiGroups: [{{ template "podSecurityPolicy.apiGroup" . }}] resources: ['podsecuritypolicies'] From 9d847f8c868dc1cfe5d77596968a86e09fdedf90 Mon Sep 17 00:00:00 2001 From: tombokombo Date: Mon, 1 Aug 2022 14:50:21 +0200 Subject: [PATCH 4/6] fix comments Signed-off-by: tombokombo --- internal/ingress/controller/store/endpointslice.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/ingress/controller/store/endpointslice.go b/internal/ingress/controller/store/endpointslice.go index de3fce9875..525f13c5a2 100644 --- a/internal/ingress/controller/store/endpointslice.go +++ b/internal/ingress/controller/store/endpointslice.go @@ -24,12 +24,12 @@ import ( "k8s.io/client-go/tools/cache" ) -// EndpointLister makes a Store that lists Endpoints. +// EndpointSliceLister makes a Store that lists Endpoints. type EndpointSliceLister struct { cache.Store } -// ByKey returns the EndpointsSlices of the Service matching key in the local Endpoint Store. +// MatchByKey returns the EndpointsSlices of the Service matching key in the local Endpoint Store. func (s *EndpointSliceLister) MatchByKey(key string) ([]*discoveryv1.EndpointSlice, error) { var eps []*discoveryv1.EndpointSlice // filter endpointSlices owned by svc From 4cb2c47aa3928278308df6999e51dbac525ec93c Mon Sep 17 00:00:00 2001 From: tombokombo Date: Tue, 2 Aug 2022 03:37:33 +0200 Subject: [PATCH 5/6] cleanup store, add store tests Signed-off-by: tombokombo --- internal/ingress/controller/store/endpoint.go | 39 -------- .../ingress/controller/store/endpoint_test.go | 66 ------------- .../ingress/controller/store/endpointslice.go | 11 ++- .../controller/store/endpointslice_test.go | 94 +++++++++++++++++++ 4 files changed, 101 insertions(+), 109 deletions(-) delete mode 100644 internal/ingress/controller/store/endpoint.go delete mode 100644 internal/ingress/controller/store/endpoint_test.go create mode 100644 internal/ingress/controller/store/endpointslice_test.go diff --git a/internal/ingress/controller/store/endpoint.go b/internal/ingress/controller/store/endpoint.go deleted file mode 100644 index ff6fbd7150..0000000000 --- a/internal/ingress/controller/store/endpoint.go +++ /dev/null @@ -1,39 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package store - -import ( - apiv1 "k8s.io/api/core/v1" - "k8s.io/client-go/tools/cache" -) - -// EndpointLister makes a Store that lists Endpoints. -type EndpointLister struct { - cache.Store -} - -// ByKey returns the Endpoints of the Service matching key in the local Endpoint Store. -func (s *EndpointLister) ByKey(key string) (*apiv1.Endpoints, error) { - eps, exists, err := s.GetByKey(key) - if err != nil { - return nil, err - } - if !exists { - return nil, NotExistsError(key) - } - return eps.(*apiv1.Endpoints), nil -} diff --git a/internal/ingress/controller/store/endpoint_test.go b/internal/ingress/controller/store/endpoint_test.go deleted file mode 100644 index 6c8ae40e21..0000000000 --- a/internal/ingress/controller/store/endpoint_test.go +++ /dev/null @@ -1,66 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package store - -import ( - apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/cache" - "testing" -) - -func newEndpointLister(t *testing.T) *EndpointLister { - t.Helper() - - return &EndpointLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)} -} - -func TestEndpointLister(t *testing.T) { - t.Run("the key does not exist", func(t *testing.T) { - el := newEndpointLister(t) - - key := "namespace/endpoint" - _, err := el.ByKey(key) - - if err == nil { - t.Error("expected an error but nothing has been returned") - } - - if _, ok := err.(NotExistsError); !ok { - t.Errorf("expected NotExistsError, got %v", err) - } - }) - - t.Run("the key exists", func(t *testing.T) { - el := newEndpointLister(t) - - key := "namespace/endpoint" - endpoint := &apiv1.Endpoints{ObjectMeta: metav1.ObjectMeta{Namespace: "namespace", Name: "endpoint"}} - - el.Add(endpoint) - - e, err := el.ByKey(key) - - if err != nil { - t.Errorf("unexpeted error %v", err) - } - - if e != endpoint { - t.Errorf("expected %v, error, got %v", e, endpoint) - } - }) -} diff --git a/internal/ingress/controller/store/endpointslice.go b/internal/ingress/controller/store/endpointslice.go index 525f13c5a2..e7aece7c99 100644 --- a/internal/ingress/controller/store/endpointslice.go +++ b/internal/ingress/controller/store/endpointslice.go @@ -34,10 +34,13 @@ func (s *EndpointSliceLister) MatchByKey(key string) ([]*discoveryv1.EndpointSli var eps []*discoveryv1.EndpointSlice // filter endpointSlices owned by svc for _, listKey := range s.ListKeys() { - if strings.HasPrefix(listKey, key) { - epss, exists, err := s.GetByKey(listKey) - if exists && err == nil { - svcName := epss.(*discoveryv1.EndpointSlice).ObjectMeta.GetLabels()[discoveryv1.LabelServiceName] + if !strings.HasPrefix(listKey, key) { + continue + } + epss, exists, err := s.GetByKey(listKey) + if exists && err == nil { + // check for svc owner label + if svcName, ok := epss.(*discoveryv1.EndpointSlice).ObjectMeta.GetLabels()[discoveryv1.LabelServiceName]; ok { namespace := epss.(*discoveryv1.EndpointSlice).ObjectMeta.GetNamespace() if key == fmt.Sprintf("%s/%s", namespace, svcName) { eps = append(eps, epss.(*discoveryv1.EndpointSlice)) diff --git a/internal/ingress/controller/store/endpointslice_test.go b/internal/ingress/controller/store/endpointslice_test.go new file mode 100644 index 0000000000..fdc51c0e48 --- /dev/null +++ b/internal/ingress/controller/store/endpointslice_test.go @@ -0,0 +1,94 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "testing" + + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" +) + +func newEndpointSliceLister(t *testing.T) *EndpointSliceLister { + t.Helper() + + return &EndpointSliceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)} +} + +func TestEndpointSliceLister(t *testing.T) { + t.Run("the key does not exist", func(t *testing.T) { + el := newEndpointSliceLister(t) + + key := "namespace/svcname" + _, err := el.MatchByKey(key) + + if err == nil { + t.Error("expected an error but nothing has been returned") + } + + if _, ok := err.(NotExistsError); !ok { + t.Errorf("expected NotExistsError, got %v", err) + } + }) + t.Run("the key exists", func(t *testing.T) { + el := newEndpointSliceLister(t) + + key := "namespace/svcname" + endpointSlice := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace", + Name: "anothername-foo", + Labels: map[string]string{ + discoveryv1.LabelServiceName: "svcname", + }, + }, + } + el.Add(endpointSlice) + endpointSlice = &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace", + Name: "svcname-bar", + Labels: map[string]string{ + discoveryv1.LabelServiceName: "othersvc", + }, + }, + } + el.Add(endpointSlice) + endpointSlice = &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace", + Name: "svcname-buz", + Labels: map[string]string{ + discoveryv1.LabelServiceName: "svcname", + }, + }, + } + el.Add(endpointSlice) + eps, err := el.MatchByKey(key) + + if err != nil { + t.Errorf("unexpeted error %v", err) + } + if err == nil && len(eps) != 1 { + t.Errorf("expected one slice %v, error, got %d slices", endpointSlice, len(eps)) + } + if len(eps) > 0 && eps[0].GetName() != endpointSlice.GetName() { + t.Errorf("expected %v, error, got %v", endpointSlice.GetName(), eps[0].GetName()) + } + }) +} From 9855b5f3f9f952ec4a3f54c342b848394108ff1f Mon Sep 17 00:00:00 2001 From: tombokombo Date: Mon, 12 Sep 2022 14:47:48 +0200 Subject: [PATCH 6/6] fix copyright date Signed-off-by: tombokombo --- internal/ingress/controller/store/endpointslice.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/ingress/controller/store/endpointslice.go b/internal/ingress/controller/store/endpointslice.go index e7aece7c99..fdd7374e9e 100644 --- a/internal/ingress/controller/store/endpointslice.go +++ b/internal/ingress/controller/store/endpointslice.go @@ -1,5 +1,5 @@ /* -Copyright 2015 The Kubernetes Authors. +Copyright 2022 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.