diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index 7e8a4257d688f..32f71baa4c98d 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -7,11 +7,13 @@ import ( "sync" "github.com/linkerd/linkerd2/controller/k8s" - consts "github.com/linkerd/linkerd2/pkg/k8s" + pkgK8s "github.com/linkerd/linkerd2/pkg/k8s" "github.com/prometheus/client_golang/prometheus" logging "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" + k8slabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/cache" ) @@ -122,7 +124,9 @@ type ( var endpointsVecs = newEndpointsMetricsVecs() // NewEndpointsWatcher creates an EndpointsWatcher and begins watching the -// k8sAPI for pod, service, and endpoint changes. +// k8sAPI for pod, service, and endpoint changes. An EndpointsWatcher will +// watch on Endpoints or EndpointSlice resources, depending on cluster configuration. +//TODO: Allow EndpointSlice resources to be used once opt-in functionality is supported. func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry) *EndpointsWatcher { ew := &EndpointsWatcher{ publishers: make(map[ServiceID]*servicePublisher), @@ -145,12 +149,22 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry) *EndpointsWatcher UpdateFunc: func(_, obj interface{}) { ew.addService(obj) }, }) - k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: ew.addEndpoints, - DeleteFunc: ew.deleteEndpoints, - UpdateFunc: func(_, obj interface{}) { ew.addEndpoints(obj) }, - }) - + if !pkgK8s.EndpointSliceAccess(k8sAPI.Client) { + // ew.log.Debugf("Cluster does not have EndpointSlice access:%v", err) + ew.log.Debugf("Watching Endpoints resources") + k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: ew.addEndpoints, + DeleteFunc: ew.deleteEndpoints, + UpdateFunc: func(_, obj interface{}) { ew.addEndpoints(obj) }, + }) + } else { + ew.log.Debugf("Watching EndpointSlice resources") + k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: ew.addEndpointSlice, + DeleteFunc: ew.deleteEndpointSlice, + UpdateFunc: ew.updateEndpointSlice, + }) + } return ew } @@ -240,17 +254,17 @@ func (ew *EndpointsWatcher) deleteService(obj interface{}) { } func (ew *EndpointsWatcher) addEndpoints(obj interface{}) { - endpoints := obj.(*corev1.Endpoints) - if endpoints.Namespace == kubeSystem { + endpoints, ok := obj.(*corev1.Endpoints) + if !ok { + ew.log.Errorf("error processing endpoints resource, got %#v expected *corev1.Endpoints", obj) return } - id := ServiceID{ - Namespace: endpoints.Namespace, - Name: endpoints.Name, - } + if endpoints.Namespace == kubeSystem { + return + } + id := ServiceID{endpoints.Namespace, endpoints.Name} sp := ew.getOrNewServicePublisher(id) - sp.updateEndpoints(endpoints) } @@ -283,6 +297,84 @@ func (ew *EndpointsWatcher) deleteEndpoints(obj interface{}) { } } +func (ew *EndpointsWatcher) addEndpointSlice(obj interface{}) { + newSlice, ok := obj.(*discovery.EndpointSlice) + if !ok { + ew.log.Errorf("error processing EndpointSlice resource, got %#v expected *discovery.EndpointSlice", obj) + return + } + + if newSlice.Namespace == kubeSystem { + return + } + + id, err := getEndpointSliceServiceID(newSlice) + if err != nil { + ew.log.Errorf("Could not fetch resource service name:%v", err) + return + } + + sp := ew.getOrNewServicePublisher(id) + sp.addEndpointSlice(newSlice) +} + +func (ew *EndpointsWatcher) updateEndpointSlice(oldObj interface{}, newObj interface{}) { + oldSlice, ok := oldObj.(*discovery.EndpointSlice) + if !ok { + ew.log.Errorf("error processing EndpointSlice resource, got %#v expected *discovery.EndpointSlice", oldObj) + return + } + newSlice, ok := newObj.(*discovery.EndpointSlice) + if !ok { + ew.log.Errorf("error processing EndpointSlice resource, got %#v expected *discovery.EndpointSlice", newObj) + return + } + + if newSlice.Namespace == kubeSystem { + return + } + + id, err := getEndpointSliceServiceID(newSlice) + if err != nil { + ew.log.Errorf("Could not fetch resource service name:%v", err) + return + } + + sp, ok := ew.getServicePublisher(id) + if ok { + sp.updateEndpointSlice(oldSlice, newSlice) + } +} + +func (ew *EndpointsWatcher) deleteEndpointSlice(obj interface{}) { + es, ok := obj.(*discovery.EndpointSlice) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + ew.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj) + } + es, ok = tombstone.Obj.(*discovery.EndpointSlice) + if !ok { + ew.log.Errorf("DeletedFinalStateUnknown contained object that is not an EndpointSlice %#v", obj) + return + } + } + + if es.Namespace == kubeSystem { + return + } + + id, err := getEndpointSliceServiceID(es) + if err != nil { + ew.log.Errorf("Could not fetch resource service name:%v", err) + } + + sp, ok := ew.getServicePublisher(id) + if ok { + sp.deleteEndpointSlice(es) + } +} + // Returns the servicePublisher for the given id if it exists. Otherwise, // create a new one and return it. func (ew *EndpointsWatcher) getOrNewServicePublisher(id ServiceID) *servicePublisher { @@ -323,7 +415,6 @@ func (sp *servicePublisher) updateEndpoints(newEndpoints *corev1.Endpoints) { sp.Lock() defer sp.Unlock() sp.log.Debugf("Updating endpoints for %s", sp.id) - for _, port := range sp.ports { port.updateEndpoints(newEndpoints) } @@ -333,12 +424,38 @@ func (sp *servicePublisher) deleteEndpoints() { sp.Lock() defer sp.Unlock() sp.log.Debugf("Deleting endpoints for %s", sp.id) - for _, port := range sp.ports { port.noEndpoints(false) } } +func (sp *servicePublisher) addEndpointSlice(newSlice *discovery.EndpointSlice) { + sp.Lock() + defer sp.Unlock() + sp.log.Debugf("Adding EndpointSlice for %s", sp.id) + for _, port := range sp.ports { + port.addEndpointSlice(newSlice) + } +} + +func (sp *servicePublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlice, newSlice *discovery.EndpointSlice) { + sp.Lock() + defer sp.Unlock() + sp.log.Debugf("Updating EndpointSlice for %s", sp.id) + for _, port := range sp.ports { + port.updateEndpointSlice(oldSlice, newSlice) + } +} + +func (sp *servicePublisher) deleteEndpointSlice(es *discovery.EndpointSlice) { + sp.Lock() + defer sp.Unlock() + sp.log.Debugf("Deleting EndpointSlice for %s", sp.id) + for _, port := range sp.ports { + port.deleteEndpointSlice(es) + } +} + func (sp *servicePublisher) updateService(newService *corev1.Service) { sp.Lock() defer sp.Unlock() @@ -411,12 +528,29 @@ func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) *por metrics: endpointsVecs.newEndpointsMetrics(sp.metricsLabels(srcPort, hostname)), } - endpoints, err := sp.k8sAPI.Endpoint().Lister().Endpoints(sp.id.Namespace).Get(sp.id.Name) - if err != nil && !apierrors.IsNotFound(err) { - sp.log.Errorf("error getting endpoints: %s", err) - } - if err == nil { - port.updateEndpoints(endpoints) + if !pkgK8s.EndpointSliceAccess(sp.k8sAPI.Client) { + // sp.log.Debugf("No EndpointSlice access, using endpoints:%v", err) + endpoints, err := sp.k8sAPI.Endpoint().Lister().Endpoints(sp.id.Namespace).Get(sp.id.Name) + if err != nil && !apierrors.IsNotFound(err) { + sp.log.Errorf("error getting endpoints: %s", err) + } + if err == nil { + port.updateEndpoints(endpoints) + } + } else { + sp.log.Debugf("Using EndpointSlice") + matchLabels := map[string]string{discovery.LabelServiceName: sp.id.Name} + selector := k8slabels.Set(matchLabels).AsSelector() + + sliceList, err := sp.k8sAPI.ES().Lister().EndpointSlices(sp.id.Namespace).List(selector) + if err != nil && !apierrors.IsNotFound(err) { + sp.log.Errorf("error getting endpointSlice list: %s", err) + } + if err == nil { + for _, slice := range sliceList { + port.addEndpointSlice(slice) + } + } } return port @@ -451,21 +585,92 @@ func (pp *portPublisher) updateEndpoints(endpoints *corev1.Endpoints) { } } } + pp.addresses = newAddressSet pp.exists = true + pp.metrics.incUpdates() + pp.metrics.setPods(len(pp.addresses.Addresses)) + pp.metrics.setExists(true) +} + +func (pp *portPublisher) addEndpointSlice(slice *discovery.EndpointSlice) { + newAddressSet := pp.endpointSliceToAddresses(slice) + for id, addr := range pp.addresses.Addresses { + newAddressSet.Addresses[id] = addr + } + + add, _ := diffAddresses(pp.addresses, newAddressSet) + if len(add.Addresses) > 0 { + for _, listener := range pp.listeners { + listener.Add(add) + } + } + pp.addresses = newAddressSet + pp.exists = true + pp.metrics.incUpdates() + pp.metrics.setPods(len(pp.addresses.Addresses)) + pp.metrics.setExists(true) +} + +func (pp *portPublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlice, newSlice *discovery.EndpointSlice) { + updatedAddressSet := AddressSet{ + Addresses: make(map[ID]Address), + Labels: pp.addresses.Labels, + } + + for id, address := range pp.addresses.Addresses { + updatedAddressSet.Addresses[id] = address + } + + oldAddressSet := pp.endpointSliceToAddresses(oldSlice) + for id := range oldAddressSet.Addresses { + delete(updatedAddressSet.Addresses, id) + } + + newAddressSet := pp.endpointSliceToAddresses(newSlice) + for id, address := range newAddressSet.Addresses { + updatedAddressSet.Addresses[id] = address + } + + add, remove := diffAddresses(pp.addresses, updatedAddressSet) + for _, listener := range pp.listeners { + if len(remove.Addresses) > 0 { + listener.Remove(remove) + } + if len(add.Addresses) > 0 { + listener.Add(add) + } + } + pp.addresses = updatedAddressSet + pp.exists = true pp.metrics.incUpdates() pp.metrics.setPods(len(pp.addresses.Addresses)) pp.metrics.setExists(true) } -func metricLabels(endpoints *corev1.Endpoints) map[string]string { - labels := map[string]string{service: endpoints.Name, namespace: endpoints.Namespace} +func metricLabels(resource interface{}) map[string]string { + var serviceName, ns string + var resLabels, resAnnotations map[string]string + switch res := resource.(type) { + case *corev1.Endpoints: + { + serviceName, ns = res.Name, res.Namespace + resLabels, resAnnotations = res.Labels, res.Annotations + } + case *discovery.EndpointSlice: + { + serviceName, ns = res.Labels[discovery.LabelServiceName], res.Namespace + resLabels, resAnnotations = res.Labels, res.Annotations + } + } + + labels := map[string]string{service: serviceName, namespace: ns} - gateway, hasRemoteGateway := endpoints.Labels[consts.RemoteGatewayNameLabel] - gatewayNs, hasRemoteGatwayNs := endpoints.Labels[consts.RemoteGatewayNsLabel] - remoteClusterName, hasRemoteClusterName := endpoints.Labels[consts.RemoteClusterNameLabel] - serviceFqn, hasServiceFqn := endpoints.Annotations[consts.RemoteServiceFqName] + gateway, hasRemoteGateway := resLabels[pkgK8s.RemoteGatewayNameLabel] + gatewayNs, hasRemoteGatwayNs := resLabels[pkgK8s.RemoteGatewayNsLabel] + remoteClusterName, hasRemoteClusterName := resLabels[pkgK8s.RemoteClusterNameLabel] + serviceFqn, hasServiceFqn := resAnnotations[pkgK8s.RemoteServiceFqName] if hasRemoteGateway && hasRemoteGatwayNs && hasRemoteClusterName && hasServiceFqn { // this means we are looking at Endpoints created for the purpose of mirroring @@ -483,6 +688,61 @@ func metricLabels(endpoints *corev1.Endpoints) map[string]string { return labels } +func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) AddressSet { + addresses := make(map[ID]Address) + resolvedPort := pp.resolveESTargetPort(es.Ports) + if resolvedPort == Port(0) { + return AddressSet{} + } + serviceID, err := getEndpointSliceServiceID(es) + if err != nil { + pp.log.Errorf("Could not fetch resource service name:%v", err) + } + + for _, endpoint := range es.Endpoints { + if endpoint.Hostname != nil { + if pp.hostname != "" && pp.hostname != *endpoint.Hostname { + continue + } + } + if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { + continue + } + + if endpoint.TargetRef == nil { + for _, IPAddr := range endpoint.Addresses { + var authorityOverride string + if fqName, ok := es.Annotations[pkgK8s.RemoteServiceFqName]; ok { + authorityOverride = fmt.Sprintf("%s:%d", fqName, pp.srcPort) + } + + identity := es.Annotations[pkgK8s.RemoteGatewayIdentity] + address, id := pp.newServiceRefAddress(resolvedPort, IPAddr, serviceID.Name, es.Namespace) + address.Identity, address.AuthorityOverride = authorityOverride, identity + addresses[id] = address + } + + continue + } + + if endpoint.TargetRef.Kind == "Pod" { + for _, IPAddr := range endpoint.Addresses { + address, id, err := pp.newPodRefAddress(resolvedPort, IPAddr, endpoint.TargetRef.Name, endpoint.TargetRef.Namespace) + if err != nil { + pp.log.Errorf("Unable to create new address:%v", err) + continue + } + addresses[id] = address + } + } + + } + return AddressSet{ + Addresses: addresses, + Labels: metricLabels(es), + } +} + func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) AddressSet { addresses := make(map[ID]Address) for _, subset := range endpoints.Subsets { @@ -491,47 +751,28 @@ func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) Addre if pp.hostname != "" && pp.hostname != endpoint.Hostname { continue } - if endpoint.TargetRef == nil { - id := ServiceID{ - Name: strings.Join([]string{ - endpoints.ObjectMeta.Name, - endpoint.IP, - fmt.Sprint(resolvedPort), - }, "-"), - Namespace: endpoints.ObjectMeta.Namespace, - } + if endpoint.TargetRef == nil { var authorityOverride string - if fqName, ok := endpoints.Annotations[consts.RemoteServiceFqName]; ok { + if fqName, ok := endpoints.Annotations[pkgK8s.RemoteServiceFqName]; ok { authorityOverride = fmt.Sprintf("%s:%d", fqName, pp.srcPort) } - addresses[id] = Address{ - IP: endpoint.IP, - Port: resolvedPort, - Identity: endpoints.Annotations[consts.RemoteGatewayIdentity], - AuthorityOverride: authorityOverride, - } + identity := endpoints.Annotations[pkgK8s.RemoteGatewayIdentity] + address, id := pp.newServiceRefAddress(resolvedPort, endpoint.IP, endpoints.Name, endpoints.Namespace) + address.Identity, address.AuthorityOverride = identity, authorityOverride + + addresses[id] = address continue } + if endpoint.TargetRef.Kind == "Pod" { - id := PodID{ - Name: endpoint.TargetRef.Name, - Namespace: endpoint.TargetRef.Namespace, - } - pod, err := pp.k8sAPI.Pod().Lister().Pods(id.Namespace).Get(id.Name) + address, id, err := pp.newPodRefAddress(resolvedPort, endpoint.IP, endpoint.TargetRef.Name, endpoint.TargetRef.Namespace) if err != nil { - pp.log.Errorf("Unable to fetch pod %v: %s", id, err) + pp.log.Errorf("Unable to create new address:%v", err) continue } - ownerKind, ownerName := pp.k8sAPI.GetOwnerKindAndName(pod, false) - addresses[id] = Address{ - IP: endpoint.IP, - Port: resolvedPort, - Pod: pod, - OwnerName: ownerName, - OwnerKind: ownerKind, - } + addresses[id] = address } } } @@ -541,6 +782,58 @@ func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) Addre } } +func (pp *portPublisher) newServiceRefAddress(endpointPort Port, endpointIP, serviceName, serviceNamespace string) (Address, ServiceID) { + id := ServiceID{ + Name: strings.Join([]string{ + serviceName, + endpointIP, + fmt.Sprint(endpointPort), + }, "-"), + Namespace: serviceNamespace, + } + + return Address{IP: endpointIP, Port: endpointPort}, id +} + +func (pp *portPublisher) newPodRefAddress(endpointPort Port, endpointIP, podName, podNamespace string) (Address, PodID, error) { + id := PodID{ + Name: podName, + Namespace: podNamespace, + } + pod, err := pp.k8sAPI.Pod().Lister().Pods(id.Namespace).Get(id.Name) + if err != nil { + return Address{}, PodID{}, fmt.Errorf("unable to fetch pod %v:%v", id, err) + } + ownerKind, ownerName := pp.k8sAPI.GetOwnerKindAndName(pod, false) + addr := Address{ + IP: endpointIP, + Port: endpointPort, + Pod: pod, + OwnerName: ownerName, + OwnerKind: ownerKind, + } + + return addr, id, nil +} + +func (pp *portPublisher) resolveESTargetPort(slicePorts []discovery.EndpointPort) Port { + if slicePorts == nil { + return Port(0) + } + + switch pp.targetPort.Type { + case intstr.Int: + return Port(pp.targetPort.IntVal) + case intstr.String: + for _, p := range slicePorts { + if *p.Name == pp.targetPort.StrVal { + return Port(*p.Port) + } + } + } + return Port(0) +} + func (pp *portPublisher) resolveTargetPort(subset corev1.EndpointSubset) Port { switch pp.targetPort.Type { case intstr.Int: @@ -557,12 +850,42 @@ func (pp *portPublisher) resolveTargetPort(subset corev1.EndpointSubset) Port { func (pp *portPublisher) updatePort(targetPort namedPort) { pp.targetPort = targetPort - endpoints, err := pp.k8sAPI.Endpoint().Lister().Endpoints(pp.id.Namespace).Get(pp.id.Name) - if err == nil { - pp.updateEndpoints(endpoints) + + if !pkgK8s.EndpointSliceAccess(pp.k8sAPI.Client) { + endpoints, err := pp.k8sAPI.Endpoint().Lister().Endpoints(pp.id.Namespace).Get(pp.id.Name) + if err == nil { + pp.updateEndpoints(endpoints) + } else { + pp.log.Errorf("Unable to get endpoints during port update: %s", err) + } } else { - pp.log.Errorf("Unable to get endpoints during port update: %s", err) + matchLabels := map[string]string{discovery.LabelServiceName: pp.id.Name} + selector := k8slabels.Set(matchLabels).AsSelector() + + endpointSlices, err := pp.k8sAPI.ES().Lister().EndpointSlices(pp.id.Namespace).List(selector) + if err == nil { + pp.addresses = AddressSet{} + for _, slice := range endpointSlices { + pp.addEndpointSlice(slice) + } + } else { + pp.log.Errorf("Unable to get EndpointSlices during port update: %s", err) + } + } +} + +func (pp *portPublisher) deleteEndpointSlice(es *discovery.EndpointSlice) { + addrSet := pp.endpointSliceToAddresses(es) + for id := range addrSet.Addresses { + delete(pp.addresses.Addresses, id) } + + for _, listener := range pp.listeners { + listener.Remove(addrSet) + } + + svcExists := len(pp.addresses.Addresses) > 0 + pp.noEndpoints(svcExists) } func (pp *portPublisher) noEndpoints(exists bool) { @@ -627,6 +950,7 @@ func getTargetPort(service *corev1.Service, port Port) namedPort { // port spec's name as the target port for _, portSpec := range service.Spec.Ports { if portSpec.Port == int32(port) { + return intstr.FromString(portSpec.Name) } } @@ -655,16 +979,16 @@ func diffAddresses(oldAddresses, newAddresses AddressSet) (add, remove AddressSe // TODO: this detects pods which have been added or removed, but does not // detect addresses which have been modified. A modified address should trigger // an add of the new version. - addAddesses := make(map[ID]Address) + addAddresses := make(map[ID]Address) removeAddresses := make(map[ID]Address) for id, newAddress := range newAddresses.Addresses { if oldAddress, ok := oldAddresses.Addresses[id]; ok { if addressChanged(oldAddress, newAddress) { - addAddesses[id] = newAddress + addAddresses[id] = newAddress } } else { // this is a new address, we need to add it - addAddesses[id] = newAddress + addAddresses[id] = newAddress } } for id, address := range oldAddresses.Addresses { @@ -673,7 +997,7 @@ func diffAddresses(oldAddresses, newAddresses AddressSet) (add, remove AddressSe } } add = AddressSet{ - Addresses: addAddesses, + Addresses: addAddresses, Labels: newAddresses.Labels, } remove = AddressSet{ @@ -681,3 +1005,32 @@ func diffAddresses(oldAddresses, newAddresses AddressSet) (add, remove AddressSe } return } + +func getEndpointSliceServiceID(es *discovery.EndpointSlice) (ServiceID, error) { + if !isValidSlice(es) { + return ServiceID{}, fmt.Errorf("EndpointSlice [%s/%s] is invalid", es.Namespace, es.Name) + } + + if svc, ok := es.Labels[discovery.LabelServiceName]; ok { + return ServiceID{es.Namespace, svc}, nil + } + + for _, ref := range es.OwnerReferences { + if ref.Kind == "Service" && ref.Name != "" { + return ServiceID{es.Namespace, ref.Name}, nil + } + } + + return ServiceID{}, fmt.Errorf("EndpointSlice [%s/%s] is invalid", es.Namespace, es.Name) +} + +func isValidSlice(es *discovery.EndpointSlice) bool { + serviceName, ok := es.Labels[discovery.LabelServiceName] + if !ok && len(es.OwnerReferences) == 0 { + return false + } else if len(es.OwnerReferences) == 0 && serviceName == "" { + return false + } + + return true +} diff --git a/controller/api/destination/watcher/endpoints_watcher_test.go b/controller/api/destination/watcher/endpoints_watcher_test.go index 3d2a5bb9cac67..2d06ca759bcc2 100644 --- a/controller/api/destination/watcher/endpoints_watcher_test.go +++ b/controller/api/destination/watcher/endpoints_watcher_test.go @@ -7,11 +7,11 @@ import ( "testing" "github.com/linkerd/linkerd2/controller/k8s" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - consts "github.com/linkerd/linkerd2/pkg/k8s" logging "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + dv1beta1 "k8s.io/api/discovery/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type bufferingEndpointListener struct { @@ -140,6 +140,8 @@ func (bel *bufferingEndpointListenerWithResVersion) Remove(set AddressSet) { func (bel *bufferingEndpointListenerWithResVersion) NoEndpoints(exists bool) {} +//TODO: Uncomment EndpointSlice related test cases +// once EndpointSlices are opt-in and supported func TestEndpointsWatcher(t *testing.T) { for _, tt := range []struct { serviceType string @@ -555,6 +557,583 @@ status: expectedNoEndpoints: false, expectedNoEndpointsServiceExists: false, }, + { + serviceType: "local services with EndpointSlice", + k8sConfigs: []string{` +kind: APIResourceList +apiVersion: v1 +groupVersion: discovery.k8s.io/v1beta1 +resources: + - name: endpointslices + singularName: endpointslice + namespaced: true + kind: EndpointSlice + verbs: + - delete + - deletecollection + - get + - list + - patch + - create + - update + - watch +`, ` +apiVersion: v1 +kind: Service +metadata: + name: name-1 + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 8989`, + ` +addressType: IPv4 +apiVersion: discovery.k8s.io/v1beta1 +endpoints: +- addresses: + - 172.17.0.12 + conditions: + ready: true + targetRef: + kind: Pod + name: name-1-1 + namespace: ns + topology: + kubernetes.io/hostname: node-1 +- addresses: + - 172.17.0.19 + conditions: + ready: true + targetRef: + kind: Pod + name: name-1-2 + namespace: ns + topology: + kubernetes.io/hostname: node-1 +- addresses: + - 172.17.0.20 + conditions: + ready: true + targetRef: + kind: Pod + name: name-1-3 + namespace: ns + topology: + kubernetes.io/hostname: node-2 +- addresses: + - 172.17.0.21 + conditions: + ready: true + topology: + kubernetes.io/hostname: node-2 +kind: EndpointSlice +metadata: + labels: + kubernetes.io/service-name: name-1 + name: name-1-bhnqh + namespace: ns +ports: +- name: "" + port: 8989`, + ` +apiVersion: v1 +kind: Pod +metadata: + name: name-1-1 + namespace: ns + ownerReferences: + - kind: ReplicaSet + name: rs-1 +status: + phase: Running + podIP: 172.17.0.12`, + ` +apiVersion: v1 +kind: Pod +metadata: + name: name-1-2 + namespace: ns + ownerReferences: + - kind: ReplicaSet + name: rs-1 +status: + phase: Running + podIP: 172.17.0.19`, + ` +apiVersion: v1 +kind: Pod +metadata: + name: name-1-3 + namespace: ns + ownerReferences: + - kind: ReplicaSet + name: rs-1 +status: + phase: Running + podIP: 172.17.0.20`, + }, + id: ServiceID{Name: "name-1", Namespace: "ns"}, + port: 8989, + // expectedAddresses: []string{ + // "172.17.0.12:8989", + // "172.17.0.19:8989", + // "172.17.0.20:8989", + // "172.17.0.21:8989", + // }, + expectedAddresses: []string{}, + //expectedNoEndpoints: false, + expectedNoEndpoints: true, + //expectedNoEndpointsServiceExists: false, + expectedNoEndpointsServiceExists: true, + expectedError: false, + }, + { + serviceType: "local services with missing addresses and EndpointSlice", + k8sConfigs: []string{` +kind: APIResourceList +apiVersion: v1 +groupVersion: discovery.k8s.io/v1beta1 +resources: + - name: endpointslices + singularName: endpointslice + namespaced: true + kind: EndpointSlice + verbs: + - delete + - deletecollection + - get + - list + - patch + - create + - update + - watch +`, ` +apiVersion: v1 +kind: Service +metadata: + name: name-1 + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 8989`, ` +addressType: IPv4 +apiVersion: discovery.k8s.io/v1beta1 +endpoints: +- addresses: + - 172.17.0.23 + conditions: + ready: true + targetRef: + kind: Pod + name: name-1-1 + namespace: ns + topology: + kubernetes.io/hostname: node-1 +- addresses: + - 172.17.0.24 + conditions: + ready: true + targetRef: + kind: Pod + name: name-1-2 + namespace: ns + topology: + kubernetes.io/hostname: node-1 +- addresses: + - 172.17.0.25 + conditions: + ready: true + targetRef: + kind: Pod + name: name-1-3 + namespace: ns + topology: + kubernetes.io/hostname: node-2 +kind: EndpointSlice +metadata: + labels: + kubernetes.io/service-name: name-1 + name: name1-f5fad + namespace: ns +ports: +- name: "" + port: 8989`, ` +apiVersion: v1 +kind: Pod +metadata: + name: name-1-3 + namespace: ns + ownerReferences: + - kind: ReplicaSet + name: rs-1 +status: + podIP: 172.17.0.25 + phase: Running`, + }, + id: ServiceID{Name: "name-1", Namespace: "ns"}, + port: 8989, + //expectedAddresses: []string{"172.17.0.25:8989"}, + expectedAddresses: []string{}, + //expectedNoEndpoints: false, + expectedNoEndpoints: true, + //expectedNoEndpointsServiceExists: false, + expectedNoEndpointsServiceExists: true, + expectedError: false, + }, + { + serviceType: "local services with no EndpointSlices", + k8sConfigs: []string{` +kind: APIResourceList +apiVersion: v1 +groupVersion: discovery.k8s.io/v1beta1 +resources: + - name: endpointslices + singularName: endpointslice + namespaced: true + kind: EndpointSlice + verbs: + - delete + - deletecollection + - get + - list + - patch + - create + - update + - watch +`, ` +apiVersion: v1 +kind: Service +metadata: + name: name-2 + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 7979`, + }, + id: ServiceID{Name: "name-2", Namespace: "ns"}, + port: 7979, + expectedAddresses: []string{}, + expectedNoEndpoints: true, + expectedNoEndpointsServiceExists: true, + expectedError: false, + }, + { + serviceType: "external name services with EndpointSlices", + k8sConfigs: []string{` +kind: APIResourceList +apiVersion: v1 +groupVersion: discovery.k8s.io/v1beta1 +resources: + - name: endpointslices + singularName: endpointslice + namespaced: true + kind: EndpointSlice + verbs: + - delete + - deletecollection + - get + - list + - patch + - create + - update + - watch +`, ` +apiVersion: v1 +kind: Service +metadata: + name: name-3-external-svc + namespace: ns +spec: + type: ExternalName + externalName: foo`, + }, + id: ServiceID{Name: "name-3-external-svc", Namespace: "ns"}, + port: 7777, + expectedAddresses: []string{}, + expectedNoEndpoints: false, + expectedNoEndpointsServiceExists: false, + expectedError: true, + }, + { + serviceType: "services that do not exist", + k8sConfigs: []string{}, + id: ServiceID{Name: "name-4-inexistent-svc", Namespace: "ns"}, + port: 5555, + expectedAddresses: []string{}, + expectedNoEndpoints: true, + expectedNoEndpointsServiceExists: false, + expectedError: false, + }, + { + serviceType: "stateful sets with EndpointSlices", + k8sConfigs: []string{` +kind: APIResourceList +apiVersion: v1 +groupVersion: discovery.k8s.io/v1beta1 +resources: + - name: endpointslices + singularName: endpointslice + namespaced: true + kind: EndpointSlice + verbs: + - delete + - deletecollection + - get + - list + - patch + - create + - update + - watch +`, ` +apiVersion: v1 +kind: Service +metadata: + name: name-1 + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 8989`, ` +addressType: IPv4 +apiVersion: discovery.k8s.io/v1beta1 +endpoints: +- addresses: + - 172.17.0.12 + conditions: + ready: true + hostname: name-1-1 + targetRef: + kind: Pod + name: name-1-1 + namespace: ns + topology: + kubernetes.io/hostname: node-1 +- addresses: + - 172.17.0.19 + hostname: name-1-2 + conditions: + ready: true + targetRef: + kind: Pod + name: name-1-2 + namespace: ns + topology: + kubernetes.io/hostname: node-1 +- addresses: + - 172.17.0.20 + hostname: name-1-3 + conditions: + ready: true + targetRef: + kind: Pod + name: name-1-3 + namespace: ns + topology: + kubernetes.io/hostname: node-2 +kind: EndpointSlice +metadata: + labels: + kubernetes.io/service-name: name-1 + name: name-1-f5fad + namespace: ns +ports: +- name: "" + port: 8989`, ` +apiVersion: v1 +kind: Pod +metadata: + name: name-1-1 + namespace: ns + ownerReferences: + - kind: ReplicaSet + name: rs-1 +status: + phase: Running + podIP: 172.17.0.12`, + ` +apiVersion: v1 +kind: Pod +metadata: + name: name-1-2 + namespace: ns + ownerReferences: + - kind: ReplicaSet + name: rs-1 +status: + phase: Running + podIP: 172.17.0.19`, + ` +apiVersion: v1 +kind: Pod +metadata: + name: name-1-3 + namespace: ns + ownerReferences: + - kind: ReplicaSet + name: rs-1 +status: + phase: Running + podIP: 172.17.0.20`, + }, + id: ServiceID{Name: "name-1", Namespace: "ns"}, + hostname: "name-1-3", + port: 6000, + //expectedAddresses: []string{"172.17.0.20:6000"}, + expectedAddresses: []string{}, + //expectedNoEndpoints: false, + expectedNoEndpoints: true, + //expectedNoEndpointsServiceExists: false, + expectedNoEndpointsServiceExists: true, + expectedError: false, + }, + { + serviceType: "service with EndpointSlice without labels", + k8sConfigs: []string{` +kind: APIResourceList +apiVersion: v1 +groupVersion: discovery.k8s.io/v1beta1 +resources: + - name: endpointslices + singularName: endpointslice + namespaced: true + kind: EndpointSlice + verbs: + - delete + - deletecollection + - get + - list + - patch + - create + - update + - watch +`, ` +apiVersion: v1 +kind: Service +metadata: + name: name-5 + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 8989`, ` +addressType: IPv4 +apiVersion: discovery.k8s.io/v1beta1 +endpoints: +- addresses: + - 172.17.0.12 + conditions: + ready: true + hostname: name-1-1 + targetRef: + kind: Pod + name: name-1-1 + namespace: ns + topology: + kubernetes.io/hostname: node-1 +kind: EndpointSlice +metadata: + labels: + name: name-1-f5fad + namespace: ns +ports: +- name: "" + port: 8989`, ` +apiVersion: v1 +kind: Pod +metadata: + name: name-1-1 + namespace: ns + ownerReferences: + - kind: ReplicaSet + name: rs-1 +status: + phase: Running + podIP: 172.17.0.12`, + }, + id: ServiceID{Name: "name-5", Namespace: "ns"}, + port: 8989, + expectedAddresses: []string{}, + expectedNoEndpoints: true, + expectedNoEndpointsServiceExists: true, + expectedError: false, + }, + { + serviceType: "service with IPv6 address type EndpointSlice", + k8sConfigs: []string{` +kind: APIResourceList +apiVersion: v1 +groupVersion: discovery.k8s.io/v1beta1 +resources: + - name: endpointslices + singularName: endpointslice + namespaced: true + kind: EndpointSlice + verbs: + - delete + - deletecollection + - get + - list + - patch + - create + - update + - watch +`, ` +apiVersion: v1 +kind: Service +metadata: + name: name-5 + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 9000`, ` +addressType: IPv6 +apiVersion: discovery.k8s.io/v1beta1 +endpoints: +- addresses: + - 0:0:0:0:0:0:0:1 + conditions: + ready: true + targetRef: + kind: Pod + name: name-5-1 + namespace: ns + topology: + kubernetes.io/hostname: node-1 +kind: EndpointSlice +metadata: + labels: + name: name-5-f65dv + namespace: ns + ownerReferences: + - apiVersion: v1 + kind: Service + name: name-5 +ports: +- name: "" + port: 9000`, ` +apiVersion: v1 +kind: Pod +metadata: + name: name-5-1 + namespace: ns + ownerReferences: + - kind: ReplicaSet + name: rs-1 +status: + phase: Running + podIP: 0:0:0:0:0:0:0:1`, + }, + id: ServiceID{Name: "name-5", Namespace: "ns"}, + port: 9000, + expectedAddresses: []string{}, + expectedNoEndpoints: true, + expectedNoEndpointsServiceExists: true, + expectedError: false, + }, } { tt := tt // pin t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) { @@ -621,6 +1200,65 @@ subsets: ` apiVersion: v1 kind: Pod +metadata: + name: name1-1 + namespace: ns +status: + phase: Running + podIP: 172.17.0.12`} + + k8sConfigsWithES := []string{` +kind: APIResourceList +apiVersion: v1 +groupVersion: discovery.k8s.io/v1beta1 +resources: + - name: endpointslices + singularName: endpointslice + namespaced: true + kind: EndpointSlice + verbs: + - delete + - deletecollection + - get + - list + - patch + - create + - update + - watch +`, ` +apiVersion: v1 +kind: Service +metadata: + name: name1 + namespace: ns +spec: + type: LoadBalancer + ports: + - port: 8989`, ` +addressType: IPv4 +apiVersion: discovery.k8s.io/v1beta1 +endpoints: +- addresses: + - 172.17.0.12 + conditions: + ready: true + targetRef: + kind: Pod + name: name1-1 + namespace: ns + topology: + kubernetes.io/hostname: node-1 +kind: EndpointSlice +metadata: + labels: + kubernetes.io/service-name: name1 + name: name1-del + namespace: ns +ports: +- name: "" + port: 8989`, ` +apiVersion: v1 +kind: Pod metadata: name: name1-1 namespace: ns @@ -636,6 +1274,7 @@ status: port Port objectToDelete interface{} deletingServices bool + hasSliceAccess bool }{ { serviceType: "can delete endpoints", @@ -671,6 +1310,24 @@ status: objectToDelete: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}}, deletingServices: true, }, + { + serviceType: "can delete EndpointSlices", + k8sConfigs: k8sConfigsWithES, + id: ServiceID{Name: "name1", Namespace: "ns"}, + port: 8989, + hostname: "name1-1", + objectToDelete: createTestEndpointSlice(), + hasSliceAccess: true, + }, + { + serviceType: "can delete EndpointSlices when wrapped in a DeletedFinalStateUnknown", + k8sConfigs: k8sConfigsWithES, + id: ServiceID{Name: "name1", Namespace: "ns"}, + port: 8989, + hostname: "name1-1", + objectToDelete: createTestEndpointSlice(), + hasSliceAccess: true, + }, } { tt := tt // pin @@ -693,7 +1350,8 @@ status: if tt.deletingServices { watcher.deleteService(tt.objectToDelete) - + } else if tt.hasSliceAccess { + watcher.deleteEndpointSlice(tt.objectToDelete) } else { watcher.deleteEndpoints(tt.objectToDelete) } @@ -952,6 +1610,26 @@ func endpoints(identity string) *corev1.Endpoints { } } +func createTestEndpointSlice() *dv1beta1.EndpointSlice { + return &dv1beta1.EndpointSlice{ + AddressType: "IPv4", + ObjectMeta: metav1.ObjectMeta{Name: "name1-del", Namespace: "ns", Labels: map[string]string{dv1beta1.LabelServiceName: "name1"}}, + Endpoints: []dv1beta1.Endpoint{ + { + Addresses: []string{"172.17.0.12"}, + Conditions: dv1beta1.EndpointConditions{Ready: func(b bool) *bool { return &b }(true)}, + TargetRef: &corev1.ObjectReference{Name: "name1-1", Namespace: "ns", Kind: "Pod"}, + }, + }, + Ports: []dv1beta1.EndpointPort{ + { + Name: func(s string) *string { return &s }(""), + Port: func(i int32) *int32 { return &i }(8989), + }, + }, + } +} + func TestEndpointsChangeDetection(t *testing.T) { k8sConfigs := []string{` diff --git a/controller/cmd/destination/main.go b/controller/cmd/destination/main.go index 5dbc3d1fe2207..cb7d0db4fd3ec 100644 --- a/controller/cmd/destination/main.go +++ b/controller/cmd/destination/main.go @@ -37,7 +37,7 @@ func Main(args []string) { k8sAPI, err := k8s.InitializeAPI( *kubeConfigPath, true, - k8s.Endpoint, k8s.Pod, k8s.RS, k8s.Svc, k8s.SP, k8s.TS, k8s.Job, + k8s.Endpoint, k8s.ES, k8s.Pod, k8s.RS, k8s.Svc, k8s.SP, k8s.TS, k8s.Job, ) if err != nil { log.Fatalf("Failed to initialize K8s API: %s", err)