Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for EndpointSlices #3260

Merged
merged 39 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
385bb13
Generate working VS config using EndpointSlices
Nov 9, 2022
9b3882b
Update rbac to support watching endpointslices
Nov 9, 2022
3c03f54
Add check for multiple EndpointSlices per service.
Nov 10, 2022
55ed20d
Return all endpoints for multiple endpointslices per service
Nov 10, 2022
cf4bd22
Add initial unit test
Nov 10, 2022
3e4032d
Add test for duplicate endpoints across two endpointslices
Nov 11, 2022
0d00a95
Remove commented out code
Nov 11, 2022
5e8a405
Create separate type for namespaced watchers
ciarams87 Nov 10, 2022
67c063a
Create separate type for namespaced watchers in certmanager controller
ciarams87 Nov 10, 2022
bb3ad14
Create separate type for namespaced watchers in extdns controller
ciarams87 Nov 10, 2022
7ec801c
Fix case where namespace is not watching secrets
ciarams87 Nov 10, 2022
7047045
Merge branch 'refactos-watchers' into endpoint-slices-backup
Nov 14, 2022
2397d33
Change informer
Nov 14, 2022
621c1b3
Merge branch 'main' into endpoint-slices
Nov 14, 2022
422b9ca
Revert example config
Nov 14, 2022
4585036
Find endpoints with endpointslices using subselectors
Nov 14, 2022
4afe089
Add additional unit test
Nov 14, 2022
be0c6a1
Fix linting
Nov 14, 2022
e9b88bc
Sync encpointslices for ingress and transport server
Nov 16, 2022
aef1891
Remove commentedcode and refactor
Nov 16, 2022
0b687d7
Remove useage of core/v1 Endpoints
Nov 16, 2022
c0381d9
Update test to ignore order of podEndpoints when comparing
Nov 16, 2022
64bc004
Merge branch 'main' into endpoint-slices
Nov 16, 2022
435325b
Revert deployment file
Nov 16, 2022
a27324d
Remove endpoints from rbac
Nov 16, 2022
b10c8e4
Update rbac for helm
Nov 16, 2022
4973890
Merge branch 'main' into endpoint-slices
shaun-nx Nov 18, 2022
a4a0201
Fix typo
Nov 21, 2022
a5ddf04
Add tests for targetPort being 0
Nov 22, 2022
8b0f615
Update unit tests
Nov 22, 2022
aef788f
Merge branch 'main' into endpoint-slices
Nov 22, 2022
ff0aaf3
Remove unused target port
Nov 22, 2022
78929a5
Update error message for TestGetEndpointsFromEndpointSlicesErrors
Nov 22, 2022
5743617
Refactor tests for endpointslices
Nov 22, 2022
9dba780
Merge branch 'main' into endpoint-slices
Nov 22, 2022
a07d4ab
Fix function names
Nov 22, 2022
721a884
Merge branch 'main' into endpoint-slices
Nov 23, 2022
b9b4019
Merge branch 'main' into endpoint-slices
shaun-nx Nov 23, 2022
704dcba
Merge branch 'main' into endpoint-slices
shaun-nx Nov 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion deployments/helm-chart/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,18 @@ rules:
- watch
- list
{{- end }}
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- services
- endpoints
verbs:
- get
- list
Expand Down
9 changes: 8 additions & 1 deletion deployments/rbac/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@ apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: nginx-ingress
rules:
shaun-nx marked this conversation as resolved.
Show resolved Hide resolved
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- services
- endpoints
verbs:
- get
- list
Expand Down
157 changes: 88 additions & 69 deletions internal/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/nginxinc/kubernetes-ingress/internal/metrics/collectors"

api_v1 "k8s.io/api/core/v1"
discovery_v1 "k8s.io/api/discovery/v1"
networking "k8s.io/api/networking/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -332,7 +333,7 @@ type namespacedInformer struct {
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory
ingressLister storeToIngressLister
svcLister cache.Store
endpointLister storeToEndpointLister
endpointSliceLister storeToEndpointSliceLister
podLister indexerToPodLister
secretLister cache.Store
virtualServerLister cache.Store
Expand All @@ -359,7 +360,7 @@ func (lbc *LoadBalancerController) newNamespacedInformer(ns string) {
// create handlers for resources we care about
lbc.addIngressHandler(createIngressHandlers(lbc), nsi)
lbc.addServiceHandler(createServiceHandlers(lbc), nsi)
lbc.addEndpointHandler(createEndpointHandlers(lbc), nsi)
lbc.addEndpointSliceHandler(createEndpointSliceHandlers(lbc), nsi)
lbc.addPodHandler(nsi)

secretsTweakListOptionsFunc := func(options *meta_v1.ListOptions) {
Expand Down Expand Up @@ -512,13 +513,13 @@ func (lbc *LoadBalancerController) addIngressHandler(handlers cache.ResourceEven
lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
}

// addEndpointHandler adds the handler for endpoints to the controller
func (lbc *LoadBalancerController) addEndpointHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) {
informer := nsi.sharedInformerFactory.Core().V1().Endpoints().Informer()
// addEndpointSliceHandler adds the handler for EndpointSlices to the controller
func (lbc *LoadBalancerController) addEndpointSliceHandler(handlers cache.ResourceEventHandlerFuncs, nsi *namespacedInformer) {
informer := nsi.sharedInformerFactory.Discovery().V1().EndpointSlices().Informer()
informer.AddEventHandler(handlers)
var el storeToEndpointLister
var el storeToEndpointSliceLister
el.Store = informer.GetStore()
nsi.endpointLister = el
nsi.endpointSliceLister = el

lbc.cacheSyncs = append(lbc.cacheSyncs, informer.HasSynced)
}
Expand Down Expand Up @@ -697,60 +698,60 @@ func (lbc *LoadBalancerController) getNamespacedInformer(ns string) *namespacedI
return nsi
}

func (lbc *LoadBalancerController) syncEndpoints(task task) {
func (lbc *LoadBalancerController) syncEndpointSlices(task task) {
key := task.Key
var obj interface{}
var endpExists bool
var endpointSliceExists bool
var err error
glog.V(3).Infof("Syncing endpoints %v", key)
glog.V(3).Infof("Syncing EndpointSlices %v", key)

ns, _, _ := cache.SplitMetaNamespaceKey(key)
obj, endpExists, err = lbc.getNamespacedInformer(ns).endpointLister.GetByKey(key)
obj, endpointSliceExists, err = lbc.getNamespacedInformer(ns).endpointSliceLister.GetByKey(key)

if err != nil {
lbc.syncQueue.Requeue(task, err)
return
}

if !endpExists {
if !endpointSliceExists {
return
}

endp := obj.(*api_v1.Endpoints)
resources := lbc.configuration.FindResourcesForEndpoints(endp.Namespace, endp.Name)
endpointSlice := obj.(*discovery_v1.EndpointSlice)
svcResource := lbc.configuration.FindResourcesForService(endpointSlice.Namespace, endpointSlice.Labels["kubernetes.io/service-name"])

resourceExes := lbc.createExtendedResources(resources)
resourceExes := lbc.createExtendedResources(svcResource)

if len(resourceExes.IngressExes) > 0 {
glog.V(3).Infof("Updating Endpoints for %v", resourceExes.IngressExes)
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.IngressExes)
err = lbc.configurator.UpdateEndpoints(resourceExes.IngressExes)
if err != nil {
glog.Errorf("Error updating endpoints for %v: %v", resourceExes.IngressExes, err)
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.IngressExes, err)
}
}

if len(resourceExes.MergeableIngresses) > 0 {
glog.V(3).Infof("Updating Endpoints for %v", resourceExes.MergeableIngresses)
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.MergeableIngresses)
err = lbc.configurator.UpdateEndpointsMergeableIngress(resourceExes.MergeableIngresses)
if err != nil {
glog.Errorf("Error updating endpoints for %v: %v", resourceExes.MergeableIngresses, err)
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.MergeableIngresses, err)
}
}

if lbc.areCustomResourcesEnabled {
if len(resourceExes.VirtualServerExes) > 0 {
glog.V(3).Infof("Updating endpoints for %v", resourceExes.VirtualServerExes)
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.VirtualServerExes)
err := lbc.configurator.UpdateEndpointsForVirtualServers(resourceExes.VirtualServerExes)
if err != nil {
glog.Errorf("Error updating endpoints for %v: %v", resourceExes.VirtualServerExes, err)
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.VirtualServerExes, err)
}
}

if len(resourceExes.TransportServerExes) > 0 {
glog.V(3).Infof("Updating endpoints for %v", resourceExes.TransportServerExes)
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.TransportServerExes)
err := lbc.configurator.UpdateEndpointsForTransportServers(resourceExes.TransportServerExes)
if err != nil {
glog.Errorf("Error updating endpoints for %v: %v", resourceExes.TransportServerExes, err)
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.TransportServerExes, err)
}
}
}
Expand Down Expand Up @@ -905,8 +906,8 @@ func (lbc *LoadBalancerController) sync(task task) {
lbc.updateTransportServerMetrics()
case configMap:
lbc.syncConfigMap(task)
case endpoints:
lbc.syncEndpoints(task)
case endpointslice:
lbc.syncEndpointSlices(task)
case secret:
lbc.syncSecret(task)
case service:
Expand Down Expand Up @@ -3280,43 +3281,54 @@ func (lbc *LoadBalancerController) getEndpointsForServiceWithSubselector(targetP
return nil, fmt.Errorf("error getting pods in namespace %v that match the selector %v: %w", svc.Namespace, labels.Merge(svc.Spec.Selector, subselector), err)
}

var svcEps api_v1.Endpoints
svcEps, err = nsi.endpointLister.GetServiceEndpoints(svc)
var svcEndpointSlices []discovery_v1.EndpointSlice
svcEndpointSlices, err = nsi.endpointSliceLister.GetServiceEndpointSlices(svc)
if err != nil {
glog.V(3).Infof("Error getting endpoints for service %s from the cache: %v", svc.Name, err)
glog.V(3).Infof("Error getting endpointslices for service %s from the cache: %v", svc.Name, err)
return nil, err
}

endps = getEndpointsBySubselectedPods(targetPort, pods, svcEps)
endps = getEndpointsFromEndpointSlicesForSubselectedPods(targetPort, pods, svcEndpointSlices)
return endps, nil
}

func getEndpointsBySubselectedPods(targetPort int32, pods []*api_v1.Pod, svcEps api_v1.Endpoints) (endps []podEndpoint) {
func getEndpointsFromEndpointSlicesForSubselectedPods(targetPort int32, pods []*api_v1.Pod, svcEndpointSlices []discovery_v1.EndpointSlice) (podEndpoints []podEndpoint) {
endpointSet := make(map[podEndpoint]struct{})
for _, pod := range pods {
for _, subset := range svcEps.Subsets {
for _, port := range subset.Ports {
if port.Port != targetPort {
for _, endpointSlice := range svcEndpointSlices {
for _, port := range endpointSlice.Ports {
if *port.Port != targetPort {
continue
}
for _, address := range subset.Addresses {
if address.IP == pod.Status.PodIP {
addr := ipv6SafeAddrPort(pod.Status.PodIP, targetPort)
ownerType, ownerName := getPodOwnerTypeAndName(pod)
podEnd := podEndpoint{
Address: addr,
PodName: getPodName(address.TargetRef),
MeshPodOwner: configs.MeshPodOwner{
OwnerType: ownerType,
OwnerName: ownerName,
},
for _, endpoint := range endpointSlice.Endpoints {
for _, address := range endpoint.Addresses {
if pod.Status.PodIP == address {
addr := ipv6SafeAddrPort(pod.Status.PodIP, targetPort)
ownerType, ownerName := getPodOwnerTypeAndName(pod)
podEndpoint := podEndpoint{
Address: addr,
PodName: getPodName(endpoint.TargetRef),
MeshPodOwner: configs.MeshPodOwner{
OwnerType: ownerType,
OwnerName: ownerName,
},
}
endpointSet[podEndpoint] = struct{}{}
podEndpoints = append(podEndpoints, podEndpoint)
}
endps = append(endps, podEnd)
}
}
}
}
}
return endps
if len(endpointSet) == 0 {
return nil
}
endpoints := make([]podEndpoint, 0, len(endpointSet))
for ep := range endpointSet {
endpoints = append(endpoints, ep)
}
return endpoints
}

func ipv6SafeAddrPort(addr string, port int32) string {
Expand Down Expand Up @@ -3394,8 +3406,8 @@ func (lbc *LoadBalancerController) getExternalEndpointsForIngressBackend(backend
}

func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *networking.IngressBackend, svc *api_v1.Service) (result []podEndpoint, isExternal bool, err error) {
var endps api_v1.Endpoints
endps, err = lbc.getNamespacedInformer(svc.Namespace).endpointLister.GetServiceEndpoints(svc)
var endpointSlices []discovery_v1.EndpointSlice
endpointSlices, err = lbc.getNamespacedInformer(svc.Namespace).endpointSliceLister.GetServiceEndpointSlices(svc)

if err != nil {
if svc.Spec.Type == api_v1.ServiceTypeExternalName {
Expand All @@ -3409,15 +3421,15 @@ func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *networ
return nil, false, err
}

result, err = lbc.getEndpointsForPort(endps, backend.Service.Port, svc)
result, err = lbc.getEndpointsForPortFromEndpointSlices(endpointSlices, backend.Service.Port, svc)
if err != nil {
glog.V(3).Infof("Error getting endpoints for service %s port %v: %v", svc.Name, configs.GetBackendPortAsString(backend.Service.Port), err)
glog.V(3).Infof("Error getting endpointslices for service %s port %v: %v", svc.Name, configs.GetBackendPortAsString(backend.Service.Port), err)
return nil, false, err
}
return result, false, nil
}

func (lbc *LoadBalancerController) getEndpointsForPort(endps api_v1.Endpoints, backendPort networking.ServiceBackendPort, svc *api_v1.Service) ([]podEndpoint, error) {
func (lbc *LoadBalancerController) getEndpointsForPortFromEndpointSlices(endpointSlices []discovery_v1.EndpointSlice, backendPort networking.ServiceBackendPort, svc *api_v1.Service) ([]podEndpoint, error) {
var targetPort int32
var err error

Expand All @@ -3435,29 +3447,36 @@ func (lbc *LoadBalancerController) getEndpointsForPort(endps api_v1.Endpoints, b
return nil, fmt.Errorf("no port %v in service %s", backendPort, svc.Name)
}

for _, subset := range endps.Subsets {
for _, port := range subset.Ports {
if port.Port == targetPort {
var endpoints []podEndpoint
for _, address := range subset.Addresses {
addr := ipv6SafeAddrPort(address.IP, port.Port)
podEnd := podEndpoint{
Address: addr,
}
if address.TargetRef != nil {
parentType, parentName := lbc.getPodOwnerTypeAndNameFromAddress(address.TargetRef.Namespace, address.TargetRef.Name)
podEnd.OwnerType = parentType
podEnd.OwnerName = parentName
podEnd.PodName = address.TargetRef.Name
endpointSet := make(map[podEndpoint]struct{})
for _, endpointSlice := range endpointSlices {
for _, endpointSlicePort := range endpointSlice.Ports {
if *endpointSlicePort.Port == targetPort {
for _, endpoint := range endpointSlice.Endpoints {
for _, endpointAddress := range endpoint.Addresses {
address := ipv6SafeAddrPort(endpointAddress, *endpointSlicePort.Port)
podEndpoint := podEndpoint{
Address: address,
}
if endpoint.TargetRef != nil {
parentType, parentName := lbc.getPodOwnerTypeAndNameFromAddress(endpoint.TargetRef.Namespace, endpoint.TargetRef.Name)
podEndpoint.OwnerType = parentType
podEndpoint.OwnerName = parentName
podEndpoint.PodName = endpoint.TargetRef.Name
}
endpointSet[podEndpoint] = struct{}{}
}
endpoints = append(endpoints, podEnd)
}
return endpoints, nil
}
}
}

return nil, fmt.Errorf("no endpoints for target port %v in service %s", targetPort, svc.Name)
if len(endpointSet) == 0 {
return nil, fmt.Errorf("no endpointslices for target port %v in service %s", targetPort, svc.Name)
}
endpoints := make([]podEndpoint, 0, len(endpointSet))
for ep := range endpointSet {
endpoints = append(endpoints, ep)
}
return endpoints, nil
}

func (lbc *LoadBalancerController) getPodOwnerTypeAndNameFromAddress(ns, name string) (parentType, parentName string) {
Expand Down
Loading