Skip to content

Commit

Permalink
Add support for EndpointSlices (#3260)
Browse files Browse the repository at this point in the history
* Generate working VS config using EndpointSlices

* Update rbac to support watching endpointslices

* Add check for multiple EndpointSlices per service.

* Return all endpoints for multiple endpointslices per service

* Add initial unit test

* Add test for duplicate endpoints across two endpointslices

* Remove commented out code

* Create separate type for namespaced watchers

* Create separate type for namespaced watchers in certmanager controller

* Create separate type for namespaced watchers in extdns controller

* Fix case where namespace is not watching secrets

* Change informer

* Revert example config

* Find endpoints with endpointslices using subselectors

* Add additional unit test

* Fix linting

* Sync encpointslices for ingress and transport server

* Remove commentedcode and refactor

* Remove useage of core/v1 Endpoints

* Update test to ignore order of podEndpoints when comparing

* Revert deployment file

* Remove endpoints from rbac

* Update rbac for helm

* Fix typo

* Add tests for targetPort being 0

* Update unit tests

* Remove unused target port

* Update error message for TestGetEndpointsFromEndpointSlicesErrors

* Refactor tests for endpointslices

* Fix function names

Co-authored-by: “shaun-nx” <“[email protected]”>
Co-authored-by: Ciara Stacke <[email protected]>
  • Loading branch information
3 people authored Nov 24, 2022
1 parent 4464caf commit 169f15f
Show file tree
Hide file tree
Showing 7 changed files with 977 additions and 131 deletions.
9 changes: 8 additions & 1 deletion deployments/helm-chart/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,18 @@ rules:
- watch
- list
{{- end }}
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- services
- endpoints
verbs:
- get
- list
Expand Down
9 changes: 8 additions & 1 deletion deployments/rbac/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@ apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: nginx-ingress
rules:
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- services
- endpoints
verbs:
- get
- list
Expand Down
157 changes: 88 additions & 69 deletions internal/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/nginxinc/kubernetes-ingress/internal/metrics/collectors"

api_v1 "k8s.io/api/core/v1"
discovery_v1 "k8s.io/api/discovery/v1"
networking "k8s.io/api/networking/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -332,7 +333,7 @@ type namespacedInformer struct {
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory
ingressLister storeToIngressLister
svcLister cache.Store
endpointLister storeToEndpointLister
endpointSliceLister storeToEndpointSliceLister
podLister indexerToPodLister
secretLister cache.Store
virtualServerLister cache.Store
Expand All @@ -359,7 +360,7 @@ func (lbc *LoadBalancerController) newNamespacedInformer(ns string) {
// create handlers for resources we care about
lbc.addIngressHandler(createIngressHandlers(lbc), nsi)
lbc.addServiceHandler(createServiceHandlers(lbc), nsi)
lbc.addEndpointHandler(createEndpointHandlers(lbc), nsi)
lbc.addEndpointSliceHandler(createEndpointSliceHandlers(lbc), nsi)
lbc.addPodHandler(nsi)

secretsTweakListOptionsFunc := func(options *meta_v1.ListOptions) {
Expand Down Expand Up @@ -512,13 +513,13 @@ func (lbc *LoadBalancerController) addIngressHandler(handlers cache.ResourceEven
lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
}

// addEndpointHandler adds the handler for endpoints to the controller
func (lbc *LoadBalancerController) addEndpointHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) {
informer := nsi.sharedInformerFactory.Core().V1().Endpoints().Informer()
// addEndpointSliceHandler adds the handler for EndpointSlices to the controller
func (lbc *LoadBalancerController) addEndpointSliceHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) {
informer := nsi.sharedInformerFactory.Discovery().V1().EndpointSlices().Informer()
informer.AddEventHandler(handlers)
var el storeToEndpointLister
var el storeToEndpointSliceLister
el.Store = informer.GetStore()
nsi.endpointLister = el
nsi.endpointSliceLister = el

lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
}
Expand Down Expand Up @@ -697,60 +698,60 @@ func (lbc *LoadBalancerController) getNamespacedInformer(ns string) *namespacedI
return nsi
}

func (lbc *LoadBalancerController) syncEndpoints(task task) {
func (lbc *LoadBalancerController) syncEndpointSlices(task task) {
key := task.Key
var obj interface{}
var endpExists bool
var endpointSliceExists bool
var err error
glog.V(3).Infof("Syncing endpoints %v", key)
glog.V(3).Infof("Syncing EndpointSlices %v", key)

ns, _, _ := cache.SplitMetaNamespaceKey(key)
obj, endpExists, err = lbc.getNamespacedInformer(ns).endpointLister.GetByKey(key)
obj, endpointSliceExists, err = lbc.getNamespacedInformer(ns).endpointSliceLister.GetByKey(key)

if err != nil {
lbc.syncQueue.Requeue(task, err)
return
}

if !endpExists {
if !endpointSliceExists {
return
}

endp := obj.(*api_v1.Endpoints)
resources := lbc.configuration.FindResourcesForEndpoints(endp.Namespace, endp.Name)
endpointSlice := obj.(*discovery_v1.EndpointSlice)
svcResource := lbc.configuration.FindResourcesForService(endpointSlice.Namespace, endpointSlice.Labels["kubernetes.io/service-name"])

resourceExes := lbc.createExtendedResources(resources)
resourceExes := lbc.createExtendedResources(svcResource)

if len(resourceExes.IngressExes) > 0 {
glog.V(3).Infof("Updating Endpoints for %v", resourceExes.IngressExes)
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.IngressExes)
err = lbc.configurator.UpdateEndpoints(resourceExes.IngressExes)
if err != nil {
glog.Errorf("Error updating endpoints for %v: %v", resourceExes.IngressExes, err)
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.IngressExes, err)
}
}

if len(resourceExes.MergeableIngresses) > 0 {
glog.V(3).Infof("Updating Endpoints for %v", resourceExes.MergeableIngresses)
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.MergeableIngresses)
err = lbc.configurator.UpdateEndpointsMergeableIngress(resourceExes.MergeableIngresses)
if err != nil {
glog.Errorf("Error updating endpoints for %v: %v", resourceExes.MergeableIngresses, err)
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.MergeableIngresses, err)
}
}

if lbc.areCustomResourcesEnabled {
if len(resourceExes.VirtualServerExes) > 0 {
glog.V(3).Infof("Updating endpoints for %v", resourceExes.VirtualServerExes)
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.VirtualServerExes)
err := lbc.configurator.UpdateEndpointsForVirtualServers(resourceExes.VirtualServerExes)
if err != nil {
glog.Errorf("Error updating endpoints for %v: %v", resourceExes.VirtualServerExes, err)
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.VirtualServerExes, err)
}
}

if len(resourceExes.TransportServerExes) > 0 {
glog.V(3).Infof("Updating endpoints for %v", resourceExes.TransportServerExes)
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.TransportServerExes)
err := lbc.configurator.UpdateEndpointsForTransportServers(resourceExes.TransportServerExes)
if err != nil {
glog.Errorf("Error updating endpoints for %v: %v", resourceExes.TransportServerExes, err)
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.TransportServerExes, err)
}
}
}
Expand Down Expand Up @@ -905,8 +906,8 @@ func (lbc *LoadBalancerController) sync(task task) {
lbc.updateTransportServerMetrics()
case configMap:
lbc.syncConfigMap(task)
case endpoints:
lbc.syncEndpoints(task)
case endpointslice:
lbc.syncEndpointSlices(task)
case secret:
lbc.syncSecret(task)
case service:
Expand Down Expand Up @@ -3280,43 +3281,54 @@ func (lbc *LoadBalancerController) getEndpointsForServiceWithSubselector(targetP
return nil, fmt.Errorf("error getting pods in namespace %v that match the selector %v: %w", svc.Namespace, labels.Merge(svc.Spec.Selector, subselector), err)
}

var svcEps api_v1.Endpoints
svcEps, err = nsi.endpointLister.GetServiceEndpoints(svc)
var svcEndpointSlices []discovery_v1.EndpointSlice
svcEndpointSlices, err = nsi.endpointSliceLister.GetServiceEndpointSlices(svc)
if err != nil {
glog.V(3).Infof("Error getting endpoints for service %s from the cache: %v", svc.Name, err)
glog.V(3).Infof("Error getting endpointslices for service %s from the cache: %v", svc.Name, err)
return nil, err
}

endps = getEndpointsBySubselectedPods(targetPort, pods, svcEps)
endps = getEndpointsFromEndpointSlicesForSubselectedPods(targetPort, pods, svcEndpointSlices)
return endps, nil
}

func getEndpointsBySubselectedPods(targetPort int32, pods []*api_v1.Pod, svcEps api_v1.Endpoints) (endps []podEndpoint) {
func getEndpointsFromEndpointSlicesForSubselectedPods(targetPort int32, pods []*api_v1.Pod, svcEndpointSlices []discovery_v1.EndpointSlice) (podEndpoints []podEndpoint) {
endpointSet := make(map[podEndpoint]struct{})
for _, pod := range pods {
for _, subset := range svcEps.Subsets {
for _, port := range subset.Ports {
if port.Port != targetPort {
for _, endpointSlice := range svcEndpointSlices {
for _, port := range endpointSlice.Ports {
if *port.Port != targetPort {
continue
}
for _, address := range subset.Addresses {
if address.IP == pod.Status.PodIP {
addr := ipv6SafeAddrPort(pod.Status.PodIP, targetPort)
ownerType, ownerName := getPodOwnerTypeAndName(pod)
podEnd := podEndpoint{
Address: addr,
PodName: getPodName(address.TargetRef),
MeshPodOwner: configs.MeshPodOwner{
OwnerType: ownerType,
OwnerName: ownerName,
},
for _, endpoint := range endpointSlice.Endpoints {
for _, address := range endpoint.Addresses {
if pod.Status.PodIP == address {
addr := ipv6SafeAddrPort(pod.Status.PodIP, targetPort)
ownerType, ownerName := getPodOwnerTypeAndName(pod)
podEndpoint := podEndpoint{
Address: addr,
PodName: getPodName(endpoint.TargetRef),
MeshPodOwner: configs.MeshPodOwner{
OwnerType: ownerType,
OwnerName: ownerName,
},
}
endpointSet[podEndpoint] = struct{}{}
podEndpoints = append(podEndpoints, podEndpoint)
}
endps = append(endps, podEnd)
}
}
}
}
}
return endps
if len(endpointSet) == 0 {
return nil
}
endpoints := make([]podEndpoint, 0, len(endpointSet))
for ep := range endpointSet {
endpoints = append(endpoints, ep)
}
return endpoints
}

func ipv6SafeAddrPort(addr string, port int32) string {
Expand Down Expand Up @@ -3394,8 +3406,8 @@ func (lbc *LoadBalancerController) getExternalEndpointsForIngressBackend(backend
}

func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *networking.IngressBackend, svc *api_v1.Service) (result []podEndpoint, isExternal bool, err error) {
var endps api_v1.Endpoints
endps, err = lbc.getNamespacedInformer(svc.Namespace).endpointLister.GetServiceEndpoints(svc)
var endpointSlices []discovery_v1.EndpointSlice
endpointSlices, err = lbc.getNamespacedInformer(svc.Namespace).endpointSliceLister.GetServiceEndpointSlices(svc)

if err != nil {
if svc.Spec.Type == api_v1.ServiceTypeExternalName {
Expand All @@ -3409,15 +3421,15 @@ func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *networ
return nil, false, err
}

result, err = lbc.getEndpointsForPort(endps, backend.Service.Port, svc)
result, err = lbc.getEndpointsForPortFromEndpointSlices(endpointSlices, backend.Service.Port, svc)
if err != nil {
glog.V(3).Infof("Error getting endpoints for service %s port %v: %v", svc.Name, configs.GetBackendPortAsString(backend.Service.Port), err)
glog.V(3).Infof("Error getting endpointslices for service %s port %v: %v", svc.Name, configs.GetBackendPortAsString(backend.Service.Port), err)
return nil, false, err
}
return result, false, nil
}

func (lbc *LoadBalancerController) getEndpointsForPort(endps api_v1.Endpoints, backendPort networking.ServiceBackendPort, svc *api_v1.Service) ([]podEndpoint, error) {
func (lbc *LoadBalancerController) getEndpointsForPortFromEndpointSlices(endpointSlices []discovery_v1.EndpointSlice, backendPort networking.ServiceBackendPort, svc *api_v1.Service) ([]podEndpoint, error) {
var targetPort int32
var err error

Expand All @@ -3435,29 +3447,36 @@ func (lbc *LoadBalancerController) getEndpointsForPort(endps api_v1.Endpoints, b
return nil, fmt.Errorf("no port %v in service %s", backendPort, svc.Name)
}

for _, subset := range endps.Subsets {
for _, port := range subset.Ports {
if port.Port == targetPort {
var endpoints []podEndpoint
for _, address := range subset.Addresses {
addr := ipv6SafeAddrPort(address.IP, port.Port)
podEnd := podEndpoint{
Address: addr,
}
if address.TargetRef != nil {
parentType, parentName := lbc.getPodOwnerTypeAndNameFromAddress(address.TargetRef.Namespace, address.TargetRef.Name)
podEnd.OwnerType = parentType
podEnd.OwnerName = parentName
podEnd.PodName = address.TargetRef.Name
endpointSet := make(map[podEndpoint]struct{})
for _, endpointSlice := range endpointSlices {
for _, endpointSlicePort := range endpointSlice.Ports {
if *endpointSlicePort.Port == targetPort {
for _, endpoint := range endpointSlice.Endpoints {
for _, endpointAddress := range endpoint.Addresses {
address := ipv6SafeAddrPort(endpointAddress, *endpointSlicePort.Port)
podEndpoint := podEndpoint{
Address: address,
}
if endpoint.TargetRef != nil {
parentType, parentName := lbc.getPodOwnerTypeAndNameFromAddress(endpoint.TargetRef.Namespace, endpoint.TargetRef.Name)
podEndpoint.OwnerType = parentType
podEndpoint.OwnerName = parentName
podEndpoint.PodName = endpoint.TargetRef.Name
}
endpointSet[podEndpoint] = struct{}{}
}
endpoints = append(endpoints, podEnd)
}
return endpoints, nil
}
}
}

return nil, fmt.Errorf("no endpoints for target port %v in service %s", targetPort, svc.Name)
if len(endpointSet) == 0 {
return nil, fmt.Errorf("no endpointslices for target port %v in service %s", targetPort, svc.Name)
}
endpoints := make([]podEndpoint, 0, len(endpointSet))
for ep := range endpointSet {
endpoints = append(endpoints, ep)
}
return endpoints, nil
}

func (lbc *LoadBalancerController) getPodOwnerTypeAndNameFromAddress(ns, name string) (parentType, parentName string) {
Expand Down
Loading

0 comments on commit 169f15f

Please sign in to comment.