From c5e28b8fbe85d104d47830ad42aab0ba222d9bdc Mon Sep 17 00:00:00 2001 From: ajaychoudhary-hotstar Date: Mon, 28 Oct 2024 18:13:17 +0530 Subject: [PATCH 1/7] Added support for endpointslices Signed-off-by: ajaychoudhary-hotstar --- CHANGELOG.md | 2 + cmd/entrypoint/endpoint_routing.go | 43 +++++++----- cmd/entrypoint/endpoint_routing_test.go | 65 ++++++++++++------- cmd/entrypoint/endpoints.go | 21 +++++- cmd/entrypoint/interesting_types.go | 9 +-- cmd/entrypoint/testdata/custom-endpoints.yaml | 58 +++++++++-------- cmd/entrypoint/testdata/hello-endpoints.yaml | 29 +++++---- cmd/entrypoint/testdata/qotm-endpoints.yaml | 29 +++++---- .../testutil_fake_k8s_store_test.go | 2 + cmd/entrypoint/watcher.go | 7 +- pkg/kates/aliases.go | 4 ++ pkg/snapshot/v1/types.go | 9 +-- 12 files changed, 166 insertions(+), 112 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c11c79d246..01864b3311 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -107,6 +107,8 @@ it will be removed; but as it won't be user-visible this isn't considered a brea instead of the Mapping name, which could reduce the cache's effectiveness. This has been fixed so that the correct key is used. ([Incorrect Cache Key for Mapping]) +- Change: Updated Emissary-Ingress to use EndpointSlices instead of Endpoints to support more than 1000 Backends + [Incorrect Cache Key for Mapping]: https://github.com/emissary-ingress/emissary/issues/5714 ## [3.9.0] November 13, 2023 diff --git a/cmd/entrypoint/endpoint_routing.go b/cmd/entrypoint/endpoint_routing.go index ba9bb59b22..3334720fcf 100644 --- a/cmd/entrypoint/endpoint_routing.go +++ b/cmd/entrypoint/endpoint_routing.go @@ -20,13 +20,22 @@ func makeEndpoints(ctx context.Context, ksnap *snapshot.KubernetesSnapshot, cons result := map[string][]*ambex.Endpoint{} - for _, k8sEp := range ksnap.Endpoints { - svc, ok := k8sServices[key(k8sEp)] - if !ok { - continue - } - for _, ep := range k8sEndpointsToAmbex(k8sEp, svc) { - result[ep.ClusterName] = append(result[ep.ClusterName], ep) + svcEndpointSlices := map[string][]*kates.EndpointSlice{} + + // Collect all the EndpointSlices for each service + for _, k8sEndpointSlice := range ksnap.EndpointSlices { + svcKey := fmt.Sprintf("%s:%s", k8sEndpointSlice.Namespace, k8sEndpointSlice.Labels["kubernetes.io/service-name"]) + svcEndpointSlices[svcKey] = append(svcEndpointSlices[svcKey], k8sEndpointSlice) + break + } + //Map each service to its corresponding endpoints from all its EndpointSlices + for svcKey, svc := range k8sServices { + if slices, ok := svcEndpointSlices[svcKey]; ok { + for _, slice := range slices { + for _, ep := range k8sEndpointsToAmbex(slice, svc) { + result[ep.ClusterName] = append(result[ep.ClusterName], ep) + } + } } } @@ -43,7 +52,7 @@ func key(resource kates.Object) string { return fmt.Sprintf("%s:%s", resource.GetNamespace(), resource.GetName()) } -func k8sEndpointsToAmbex(ep *kates.Endpoints, svc *kates.Service) (result []*ambex.Endpoint) { +func k8sEndpointsToAmbex(endpointSlice *kates.EndpointSlice, svc *kates.Service) (result []*ambex.Endpoint) { portmap := map[string][]string{} for _, p := range svc.Spec.Ports { port := fmt.Sprintf("%d", p.Port) @@ -64,11 +73,11 @@ func k8sEndpointsToAmbex(ep *kates.Endpoints, svc *kates.Service) (result []*amb } } - for _, subset := range ep.Subsets { - for _, port := range subset.Ports { - if port.Protocol == kates.ProtocolTCP || port.Protocol == kates.ProtocolUDP { + for _, endpoint := range endpointSlice.Endpoints { + for _, port := range endpointSlice.Ports { + if *port.Protocol == kates.ProtocolTCP || *port.Protocol == kates.ProtocolUDP { portNames := map[string]bool{} - candidates := []string{fmt.Sprintf("%d", port.Port), port.Name, ""} + candidates := []string{fmt.Sprintf("%d", *port.Port), *port.Name, ""} for _, c := range candidates { if pns, ok := portmap[c]; ok { for _, pn := range pns { @@ -76,17 +85,17 @@ func k8sEndpointsToAmbex(ep *kates.Endpoints, svc *kates.Service) (result []*amb } } } - for _, addr := range subset.Addresses { + for _, address := range endpoint.Addresses { for pn := range portNames { sep := "/" if pn == "" { sep = "" } result = append(result, &ambex.Endpoint{ - ClusterName: fmt.Sprintf("k8s/%s/%s%s%s", ep.Namespace, ep.Name, sep, pn), - Ip: addr.IP, - Port: uint32(port.Port), - Protocol: string(port.Protocol), + ClusterName: fmt.Sprintf("k8s/%s/%s%s%s", svc.Namespace, svc.Name, sep, pn), + Ip: address, + Port: uint32(*port.Port), + Protocol: string(*port.Protocol), }) } } diff --git a/cmd/entrypoint/endpoint_routing_test.go b/cmd/entrypoint/endpoint_routing_test.go index 4f5982dd74..54d7beedc7 100644 --- a/cmd/entrypoint/endpoint_routing_test.go +++ b/cmd/entrypoint/endpoint_routing_test.go @@ -24,9 +24,9 @@ func TestEndpointRouting(t *testing.T) { // Create Mapping, Service, and Endpoints resources to start. assert.NoError(t, f.Upsert(makeMapping("default", "foo", "/foo", "foo", "endpoint"))) assert.NoError(t, f.Upsert(makeService("default", "foo"))) - subset, err := makeSubset(8080, "1.2.3.4") + endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4") require.NoError(t, err) - assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset))) + assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port))) f.Flush() snap, err := f.GetSnapshot(HasMapping("default", "foo")) require.NoError(t, err) @@ -57,9 +57,9 @@ service: foo resolver: endpoint`, } assert.NoError(t, f.Upsert(svc)) - subset, err := makeSubset(8080, "1.2.3.4") + endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4") require.NoError(t, err) - assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset))) + assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port))) f.Flush() snap, err := f.GetSnapshot(HasService("default", "foo")) require.NoError(t, err) @@ -97,9 +97,9 @@ func TestEndpointRoutingMultiplePorts(t *testing.T) { }, }, })) - subset, err := makeSubset("cleartext", 8080, "encrypted", 8443, "1.2.3.4") + endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4") require.NoError(t, err) - assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset))) + assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port))) f.Flush() snap, err := f.GetSnapshot(HasMapping("default", "foo")) require.NoError(t, err) @@ -155,9 +155,9 @@ func TestEndpointRoutingIP(t *testing.T) { func TestEndpointRoutingMappingCreation(t *testing.T) { f := entrypoint.RunFake(t, entrypoint.FakeConfig{}, nil) assert.NoError(t, f.Upsert(makeService("default", "foo"))) - subset, err := makeSubset(8080, "1.2.3.4") + endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4") require.NoError(t, err) - assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset))) + assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port))) f.Flush() f.AssertEndpointsEmpty(timeout) assert.NoError(t, f.UpsertYAML(` @@ -242,36 +242,51 @@ func makeService(namespace, name string) *kates.Service { } } -func makeEndpoints(namespace, name string, subsets ...kates.EndpointSubset) *kates.Endpoints { - return &kates.Endpoints{ - TypeMeta: kates.TypeMeta{Kind: "Endpoints"}, - ObjectMeta: kates.ObjectMeta{Namespace: namespace, Name: name}, - Subsets: subsets, +func makeEndpointSlice(namespace, name, serviceName string, endpoint []kates.Endpoint, port []kates.EndpointSlicePort) *kates.EndpointSlice { + return &kates.EndpointSlice{ + TypeMeta: kates.TypeMeta{Kind: "EndpointSlices", APIVersion: "v1.discovery.k8s.io"}, + ObjectMeta: kates.ObjectMeta{ + Namespace: namespace, + Name: name, + Labels: map[string]string{ + "kubernetes.io/service-name": serviceName, + }, + }, + Endpoints: endpoint, + Ports: port, } } -// makeSubset provides a convenient way to kubernetes EndpointSubset resources. Any int args are -// ports, any ip address strings are addresses, and no ip address strings are used as the port name -// for any ports that follow them in the arg list. -func makeSubset(args ...interface{}) (kates.EndpointSubset, error) { +func makeSliceEndpoint(args ...interface{}) ([]kates.Endpoint, []kates.EndpointSlicePort, error) { + var endpoints []kates.Endpoint + var ports []kates.EndpointSlicePort portName := "" - var ports []kates.EndpointPort - var addrs []kates.EndpointAddress + for _, arg := range args { switch v := arg.(type) { case int: - ports = append(ports, kates.EndpointPort{Name: portName, Port: int32(v), Protocol: kates.ProtocolTCP}) + ports = append(ports, kates.EndpointSlicePort{Name: &portName, Port: int32Ptr(int32(v)), Protocol: protocolPtr(kates.ProtocolTCP)}) case string: IP := net.ParseIP(v) - if IP == nil { - portName = v + if IP != nil { + endpoints = append(endpoints, kates.Endpoint{ + Addresses: []string{v}, + }) } else { - addrs = append(addrs, kates.EndpointAddress{IP: v}) + portName = v // Assume it's a port name if not an IP address } default: - return kates.EndpointSubset{}, fmt.Errorf("unrecognized type: %T", v) + return nil, nil, fmt.Errorf("unrecognized type: %T", v) } } - return kates.EndpointSubset{Addresses: addrs, Ports: ports}, nil + return endpoints, ports, nil +} + +func int32Ptr(i int32) *int32 { + return &i +} + +func protocolPtr(p kates.Protocol) *kates.Protocol { + return &p } diff --git a/cmd/entrypoint/endpoints.go b/cmd/entrypoint/endpoints.go index b20cf58b55..b83330b68c 100644 --- a/cmd/entrypoint/endpoints.go +++ b/cmd/entrypoint/endpoints.go @@ -22,6 +22,7 @@ type endpointRoutingInfo struct { module moduleResolver endpointWatches map[string]bool // A set to track the subset of kubernetes endpoints we care about. previousWatches map[string]bool + endpointSlices []*kates.EndpointSlice } type ResolverType int @@ -47,7 +48,7 @@ func (rt ResolverType) String() string { // newEndpointRoutingInfo creates a shiny new struct to hold information about // resolvers in use and such. -func newEndpointRoutingInfo() endpointRoutingInfo { +func newEndpointRoutingInfo(endpointSlices []*kates.EndpointSlice) endpointRoutingInfo { return endpointRoutingInfo{ // resolverTypes keeps track of the type of every resolver in the system. // It starts out empty. @@ -59,6 +60,7 @@ func newEndpointRoutingInfo() endpointRoutingInfo { resolverTypes: make(map[string]ResolverType), // Track which endpoints we actually want to watch. endpointWatches: make(map[string]bool), + endpointSlices: endpointSlices, } } @@ -71,6 +73,7 @@ func (eri *endpointRoutingInfo) reconcileEndpointWatches(ctx context.Context, s eri.module = moduleResolver{} eri.previousWatches = eri.endpointWatches eri.endpointWatches = map[string]bool{} + eri.endpointSlices = s.EndpointSlices // Phase one processes all the configuration stuff that Mappings depend on. Right now this // includes Modules and Resolvers. When we are done with Phase one we have processed enough @@ -228,7 +231,13 @@ func (eri *endpointRoutingInfo) checkMapping(ctx context.Context, mapping *amb.M if eri.resolverTypes[resolver] == KubernetesEndpointResolver { svc, ns, _ := eri.module.parseService(ctx, mapping, service, mapping.GetNamespace()) - eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true + for _, endpointSlice := range eri.endpointSlices { + // Check if this EndpointSlice matches the target service and namespace + if endpointSlice.Namespace == ns && endpointSlice.Labels["kubernetes.io/service-name"] == svc { + eri.endpointWatches[fmt.Sprintf("%s:%s", ns, endpointSlice.Name)] = true + + } + } } } @@ -247,7 +256,13 @@ func (eri *endpointRoutingInfo) checkTCPMapping(ctx context.Context, tcpmapping if eri.resolverTypes[resolver] == KubernetesEndpointResolver { svc, ns, _ := eri.module.parseService(ctx, tcpmapping, service, tcpmapping.GetNamespace()) - eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true + for _, endpointSlice := range eri.endpointSlices { + // Check if this EndpointSlice matches the target service and namespace + if endpointSlice.Namespace == ns && endpointSlice.Labels["kubernetes.io/service-name"] == svc { + eri.endpointWatches[fmt.Sprintf("%s:%s", ns, endpointSlice.Name)] = true + + } + } } } diff --git a/cmd/entrypoint/interesting_types.go b/cmd/entrypoint/interesting_types.go index 1cc5dfd1ef..d1f67f2014 100644 --- a/cmd/entrypoint/interesting_types.go +++ b/cmd/entrypoint/interesting_types.go @@ -80,10 +80,11 @@ func GetInterestingTypes(ctx context.Context, serverTypeList []kates.APIResource // // Note that we pull `secrets.v1.` in to "K8sSecrets". ReconcileSecrets will pull // over the ones we need into "Secrets" and "Endpoints" respectively. - "Services": {{typename: "services.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that) - "Endpoints": {{typename: "endpoints.v1.", fieldselector: endpointFs}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that) - "K8sSecrets": {{typename: "secrets.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that) - "ConfigMaps": {{typename: "configmaps.v1.", fieldselector: configMapFs}}, + "Services": {{typename: "services.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that) + "Endpoints": {{typename: "endpoints.v1.", fieldselector: endpointFs}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that) + "EndpointSlices": {{typename: "endpointslices.v1.discovery.k8s.io", fieldselector: endpointFs}}, + "K8sSecrets": {{typename: "secrets.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that) + "ConfigMaps": {{typename: "configmaps.v1.", fieldselector: configMapFs}}, "Ingresses": { {typename: "ingresses.v1beta1.extensions"}, // New in Kubernetes 1.2.0 (2016-03-16), gone in Kubernetes 1.22.0 (2021-08-04) {typename: "ingresses.v1beta1.networking.k8s.io"}, // New in Kubernetes 1.14.0 (2019-03-25), gone in Kubernetes 1.22.0 (2021-08-04) diff --git a/cmd/entrypoint/testdata/custom-endpoints.yaml b/cmd/entrypoint/testdata/custom-endpoints.yaml index 1d3506a01e..6ae4c53e1b 100644 --- a/cmd/entrypoint/testdata/custom-endpoints.yaml +++ b/cmd/entrypoint/testdata/custom-endpoints.yaml @@ -1,46 +1,48 @@ --- # All the IP addresses, pod names, etc., are basically made up. These -# aren't meant to be functional, just to exercise the machinery of +# aren't meant to be functional, just to exercise the machinery of # filting things in the watcher. -apiVersion: v1 -kind: Endpoints +apiVersion: discovery.k8s.io/v1 +kind: EndpointSlice metadata: name: random-1 -subsets: +endpoints: - addresses: - - ip: 10.42.0.55 - nodeName: flynn-2a - targetRef: - kind: Pod - name: random-6db467b4d7-zzzz1 - - ip: 10.42.0.56 - nodeName: flynn-2b - targetRef: - kind: Pod - name: random-6db467b4d7-zzzz1 + - "10.42.0.55" + nodeName: flynn-2a + targetRef: + kind: Pod + name: random-6db467b4d7-zzzz1 +- addresses: + - "10.42.0.56" + nodeName: flynn-2b + targetRef: + kind: Pod + name: random-6db467b4d7-zzzz1 ports: - port: 5000 protocol: TCP --- # All the IP addresses, pod names, etc., are basically made up. These -# aren't meant to be functional, just to exercise the machinery of +# aren't meant to be functional, just to exercise the machinery of # filting things in the watcher. -apiVersion: v1 -kind: Endpoints +apiVersion: discovery.k8s.io/v1 +kind: EndpointSlice metadata: name: random-2 -subsets: +endpoints: +- addresses: + - "10.42.0.65" + nodeName: flynn-2a + targetRef: + kind: Pod + name: rande-6db467b4d7-zzzz2 - addresses: - - ip: 10.42.0.65 - nodeName: flynn-2a - targetRef: - kind: Pod - name: rande-6db467b4d7-zzzz2 - - ip: 10.42.0.66 - nodeName: flynn-2b - targetRef: - kind: Pod - name: rande-6db467b4d7-zzzz2 + - "10.42.0.66" + nodeName: flynn-2b + targetRef: + kind: Pod + name: rande-6db467b4d7-zzzz2 ports: - port: 5000 protocol: TCP diff --git a/cmd/entrypoint/testdata/hello-endpoints.yaml b/cmd/entrypoint/testdata/hello-endpoints.yaml index 90ab83f10d..7832f6b3b2 100644 --- a/cmd/entrypoint/testdata/hello-endpoints.yaml +++ b/cmd/entrypoint/testdata/hello-endpoints.yaml @@ -11,24 +11,25 @@ spec: targetPort: http-api --- # All the IP addresses, pod names, etc., are basically made up. These -# aren't meant to be functional, just to exercise the machinery of +# aren't meant to be functional, just to exercise the machinery of # filting things in the watcher. -apiVersion: v1 -kind: Endpoints +apiVersion: discovery.k8s.io/v1 +kind: EndpointSlice metadata: name: hello -subsets: +endpoints: - addresses: - - ip: 10.42.0.15 - nodeName: flynn-2a - targetRef: - kind: Pod - name: hello-6db467b4d7-n45n7 - - ip: 10.42.0.16 - nodeName: flynn-2b - targetRef: - kind: Pod - name: hello-6db467b4d7-n45n7 + - "10.42.0.15" + nodeName: flynn-2a + targetRef: + kind: Pod + name: hello-6db467b4d7-n45n7 +- addresses: + - "10.42.0.16" + nodeName: flynn-2b + targetRef: + kind: Pod + name: hello-6db467b4d7-n45n7 ports: - port: 5000 protocol: TCP diff --git a/cmd/entrypoint/testdata/qotm-endpoints.yaml b/cmd/entrypoint/testdata/qotm-endpoints.yaml index f360455968..843ee9630f 100644 --- a/cmd/entrypoint/testdata/qotm-endpoints.yaml +++ b/cmd/entrypoint/testdata/qotm-endpoints.yaml @@ -11,24 +11,25 @@ spec: targetPort: http-api --- # All the IP addresses, pod names, etc., are basically made up. These -# aren't meant to be functional, just to exercise the machinery of +# aren't meant to be functional, just to exercise the machinery of # filting things in the watcher. -apiVersion: v1 -kind: Endpoints +apiVersion: discovery.k8s.io/v1 +kind: EndpointSlice metadata: name: qotm -subsets: +endpoints: +- addresses: + - "10.42.0.15" + nodeName: flynn-2a + targetRef: + kind: Pod + name: qotm-6db467b4d7-n45n7 - addresses: - - ip: 10.42.0.15 - nodeName: flynn-2a - targetRef: - kind: Pod - name: qotm-6db467b4d7-n45n7 - - ip: 10.42.0.16 - nodeName: flynn-2b - targetRef: - kind: Pod - name: qotm-6db467b4d7-n45n7 + - "10.42.0.16" + nodeName: flynn-2b + targetRef: + kind: Pod + name: qotm-6db467b4d7-n45n7 ports: - port: 5000 protocol: TCP diff --git a/cmd/entrypoint/testutil_fake_k8s_store_test.go b/cmd/entrypoint/testutil_fake_k8s_store_test.go index 9588abdd9d..01de89389c 100644 --- a/cmd/entrypoint/testutil_fake_k8s_store_test.go +++ b/cmd/entrypoint/testutil_fake_k8s_store_test.go @@ -211,6 +211,8 @@ func canonGVK(rawString string) (canonKind string, canonGroupVersion string, err return "Service", "v1", nil case "endpoints": return "Endpoints", "v1", nil + case "endpointslices": + return "EndpointSlices", "v1.discovery.k8s.io", nil case "secret", "secrets": return "Secret", "v1", nil case "configmap", "configmaps": diff --git a/cmd/entrypoint/watcher.go b/cmd/entrypoint/watcher.go index b1eee5f3d4..d4b74fb653 100644 --- a/cmd/entrypoint/watcher.go +++ b/cmd/entrypoint/watcher.go @@ -361,12 +361,13 @@ func NewSnapshotHolder(ambassadorMeta *snapshot.AmbassadorMetaInfo) (*SnapshotHo if err != nil { return nil, err } + k8sSnapshot := NewKubernetesSnapshot() return &SnapshotHolder{ validator: validator, ambassadorMeta: ambassadorMeta, - k8sSnapshot: NewKubernetesSnapshot(), + k8sSnapshot: k8sSnapshot, consulSnapshot: &snapshot.ConsulSnapshot{}, - endpointRoutingInfo: newEndpointRoutingInfo(), + endpointRoutingInfo: newEndpointRoutingInfo(k8sSnapshot.EndpointSlices), dispatcher: disp, firstReconfig: true, }, nil @@ -491,7 +492,7 @@ func (sh *SnapshotHolder) K8sUpdate( for _, delta := range deltas { sh.unsentDeltas = append(sh.unsentDeltas, delta) - if delta.Kind == "Endpoints" { + if delta.Kind == "EndpointSlice" { key := fmt.Sprintf("%s:%s", delta.Namespace, delta.Name) if sh.endpointRoutingInfo.endpointWatches[key] || sh.dispatcher.IsWatched(delta.Namespace, delta.Name) { endpointsChanged = true diff --git a/pkg/kates/aliases.go b/pkg/kates/aliases.go index d56f5be47e..8da276c97c 100644 --- a/pkg/kates/aliases.go +++ b/pkg/kates/aliases.go @@ -3,6 +3,7 @@ package kates import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" xv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -67,6 +68,9 @@ type Endpoints = corev1.Endpoints type EndpointSubset = corev1.EndpointSubset type EndpointAddress = corev1.EndpointAddress type EndpointPort = corev1.EndpointPort +type EndpointSlice = discoveryv1.EndpointSlice +type Endpoint = discoveryv1.Endpoint +type EndpointSlicePort = discoveryv1.EndpointPort type Protocol = corev1.Protocol diff --git a/pkg/snapshot/v1/types.go b/pkg/snapshot/v1/types.go index 2019b04bb8..742a6b29dc 100644 --- a/pkg/snapshot/v1/types.go +++ b/pkg/snapshot/v1/types.go @@ -56,10 +56,11 @@ type ConsulSnapshot struct { type KubernetesSnapshot struct { // k8s resources - IngressClasses []*IngressClass `json:"ingressclasses"` - Ingresses []*Ingress `json:"ingresses"` - Services []*kates.Service `json:"service"` - Endpoints []*kates.Endpoints `json:"Endpoints"` + IngressClasses []*IngressClass `json:"ingressclasses"` + Ingresses []*Ingress `json:"ingresses"` + Services []*kates.Service `json:"service"` + Endpoints []*kates.Endpoints `json:"Endpoints"` + EndpointSlices []*kates.EndpointSlice `json:"EndpointSlice"` // ambassador resources Listeners []*amb.Listener `json:"Listener"` From 88712774f463a4a1c2ec8ca064475e1190a5963c Mon Sep 17 00:00:00 2001 From: ajaychoudhary-hotstar Date: Mon, 28 Oct 2024 18:51:51 +0530 Subject: [PATCH 2/7] updated test Yaml Signed-off-by: ajaychoudhary-hotstar --- cmd/entrypoint/testdata/custom-endpoints.yaml | 12 ++++++------ cmd/entrypoint/testdata/hello-endpoints.yaml | 8 ++++---- cmd/entrypoint/testdata/qotm-endpoints.yaml | 6 +++--- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/cmd/entrypoint/testdata/custom-endpoints.yaml b/cmd/entrypoint/testdata/custom-endpoints.yaml index 6ae4c53e1b..017a7d1d61 100644 --- a/cmd/entrypoint/testdata/custom-endpoints.yaml +++ b/cmd/entrypoint/testdata/custom-endpoints.yaml @@ -19,9 +19,9 @@ endpoints: targetRef: kind: Pod name: random-6db467b4d7-zzzz1 - ports: - - port: 5000 - protocol: TCP +ports: +- port: 5000 + protocol: TCP --- # All the IP addresses, pod names, etc., are basically made up. These # aren't meant to be functional, just to exercise the machinery of @@ -43,6 +43,6 @@ endpoints: targetRef: kind: Pod name: rande-6db467b4d7-zzzz2 - ports: - - port: 5000 - protocol: TCP +ports: +- port: 5000 + protocol: TCP diff --git a/cmd/entrypoint/testdata/hello-endpoints.yaml b/cmd/entrypoint/testdata/hello-endpoints.yaml index 7832f6b3b2..438fb49615 100644 --- a/cmd/entrypoint/testdata/hello-endpoints.yaml +++ b/cmd/entrypoint/testdata/hello-endpoints.yaml @@ -24,12 +24,12 @@ endpoints: targetRef: kind: Pod name: hello-6db467b4d7-n45n7 -- addresses: +- addresses: - "10.42.0.16" nodeName: flynn-2b targetRef: kind: Pod name: hello-6db467b4d7-n45n7 - ports: - - port: 5000 - protocol: TCP +ports: +- port: 5000 + protocol: TCP diff --git a/cmd/entrypoint/testdata/qotm-endpoints.yaml b/cmd/entrypoint/testdata/qotm-endpoints.yaml index 843ee9630f..f6423c0a0c 100644 --- a/cmd/entrypoint/testdata/qotm-endpoints.yaml +++ b/cmd/entrypoint/testdata/qotm-endpoints.yaml @@ -30,6 +30,6 @@ endpoints: targetRef: kind: Pod name: qotm-6db467b4d7-n45n7 - ports: - - port: 5000 - protocol: TCP +ports: +- port: 5000 + protocol: TCP From 46ab826f0366c0d3f7147cb86748312084d509c1 Mon Sep 17 00:00:00 2001 From: ajaychoudhary-hotstar Date: Tue, 29 Oct 2024 11:58:20 +0530 Subject: [PATCH 3/7] Removed break Signed-off-by: ajaychoudhary-hotstar --- cmd/entrypoint/endpoint_routing.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/entrypoint/endpoint_routing.go b/cmd/entrypoint/endpoint_routing.go index 3334720fcf..b70a1f5fac 100644 --- a/cmd/entrypoint/endpoint_routing.go +++ b/cmd/entrypoint/endpoint_routing.go @@ -26,7 +26,6 @@ func makeEndpoints(ctx context.Context, ksnap *snapshot.KubernetesSnapshot, cons for _, k8sEndpointSlice := range ksnap.EndpointSlices { svcKey := fmt.Sprintf("%s:%s", k8sEndpointSlice.Namespace, k8sEndpointSlice.Labels["kubernetes.io/service-name"]) svcEndpointSlices[svcKey] = append(svcEndpointSlices[svcKey], k8sEndpointSlice) - break } //Map each service to its corresponding endpoints from all its EndpointSlices for svcKey, svc := range k8sServices { From b1fb2d6bfb5037e56bc794bafdff28ab5ba0c2d1 Mon Sep 17 00:00:00 2001 From: ajaychoudhary-hotstar Date: Wed, 30 Oct 2024 16:27:25 +0530 Subject: [PATCH 4/7] Added endpoints fallback in case endpointslice doesn't exists Signed-off-by: ajaychoudhary-hotstar --- cmd/entrypoint/endpoint_routing.go | 80 ++++++++++++++++++++++--- cmd/entrypoint/endpoint_routing_test.go | 57 ++++++++++++++++++ cmd/entrypoint/endpoints.go | 28 +++++---- cmd/entrypoint/watcher.go | 2 +- 4 files changed, 148 insertions(+), 19 deletions(-) diff --git a/cmd/entrypoint/endpoint_routing.go b/cmd/entrypoint/endpoint_routing.go index b70a1f5fac..6839fec3a6 100644 --- a/cmd/entrypoint/endpoint_routing.go +++ b/cmd/entrypoint/endpoint_routing.go @@ -22,19 +22,31 @@ func makeEndpoints(ctx context.Context, ksnap *snapshot.KubernetesSnapshot, cons svcEndpointSlices := map[string][]*kates.EndpointSlice{} - // Collect all the EndpointSlices for each service + // Collect all the EndpointSlices for each service if the "kubernetes.io/service-name" label is present for _, k8sEndpointSlice := range ksnap.EndpointSlices { - svcKey := fmt.Sprintf("%s:%s", k8sEndpointSlice.Namespace, k8sEndpointSlice.Labels["kubernetes.io/service-name"]) - svcEndpointSlices[svcKey] = append(svcEndpointSlices[svcKey], k8sEndpointSlice) + if serviceName, labelExists := k8sEndpointSlice.Labels["kubernetes.io/service-name"]; labelExists { + svcKey := fmt.Sprintf("%s:%s", k8sEndpointSlice.Namespace, serviceName) + svcEndpointSlices[svcKey] = append(svcEndpointSlices[svcKey], k8sEndpointSlice) + } } - //Map each service to its corresponding endpoints from all its EndpointSlices + + // Map each service to its corresponding endpoints from all its EndpointSlices, or fall back to Endpoints if needed for svcKey, svc := range k8sServices { - if slices, ok := svcEndpointSlices[svcKey]; ok { + if slices, ok := svcEndpointSlices[svcKey]; ok && len(slices) > 0 { for _, slice := range slices { - for _, ep := range k8sEndpointsToAmbex(slice, svc) { + for _, ep := range k8sEndpointSlicesToAmbex(slice, svc) { result[ep.ClusterName] = append(result[ep.ClusterName], ep) } } + } else { + // Fallback to using Endpoints if no valid EndpointSlices are available + for _, k8sEp := range ksnap.Endpoints { + if key(k8sEp) == svcKey { + for _, ep := range k8sEndpointsToAmbex(k8sEp, svc) { + result[ep.ClusterName] = append(result[ep.ClusterName], ep) + } + } + } } } @@ -51,7 +63,61 @@ func key(resource kates.Object) string { return fmt.Sprintf("%s:%s", resource.GetNamespace(), resource.GetName()) } -func k8sEndpointsToAmbex(endpointSlice *kates.EndpointSlice, svc *kates.Service) (result []*ambex.Endpoint) { +func k8sEndpointsToAmbex(ep *kates.Endpoints, svc *kates.Service) (result []*ambex.Endpoint) { + portmap := map[string][]string{} + for _, p := range svc.Spec.Ports { + port := fmt.Sprintf("%d", p.Port) + targetPort := p.TargetPort.String() + if targetPort == "" { + targetPort = fmt.Sprintf("%d", p.Port) + } + + portmap[targetPort] = append(portmap[targetPort], port) + if p.Name != "" { + portmap[targetPort] = append(portmap[targetPort], p.Name) + portmap[p.Name] = append(portmap[p.Name], port) + } + if len(svc.Spec.Ports) == 1 { + portmap[targetPort] = append(portmap[targetPort], "") + portmap[""] = append(portmap[""], port) + portmap[""] = append(portmap[""], "") + } + } + + for _, subset := range ep.Subsets { + for _, port := range subset.Ports { + if port.Protocol == kates.ProtocolTCP || port.Protocol == kates.ProtocolUDP { + portNames := map[string]bool{} + candidates := []string{fmt.Sprintf("%d", port.Port), port.Name, ""} + for _, c := range candidates { + if pns, ok := portmap[c]; ok { + for _, pn := range pns { + portNames[pn] = true + } + } + } + for _, addr := range subset.Addresses { + for pn := range portNames { + sep := "/" + if pn == "" { + sep = "" + } + result = append(result, &ambex.Endpoint{ + ClusterName: fmt.Sprintf("k8s/%s/%s%s%s", ep.Namespace, ep.Name, sep, pn), + Ip: addr.IP, + Port: uint32(port.Port), + Protocol: string(port.Protocol), + }) + } + } + } + } + } + + return +} + +func k8sEndpointSlicesToAmbex(endpointSlice *kates.EndpointSlice, svc *kates.Service) (result []*ambex.Endpoint) { portmap := map[string][]string{} for _, p := range svc.Spec.Ports { port := fmt.Sprintf("%d", p.Port) diff --git a/cmd/entrypoint/endpoint_routing_test.go b/cmd/entrypoint/endpoint_routing_test.go index 54d7beedc7..f132b541c2 100644 --- a/cmd/entrypoint/endpoint_routing_test.go +++ b/cmd/entrypoint/endpoint_routing_test.go @@ -42,6 +42,29 @@ func TestEndpointRouting(t *testing.T) { assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo/80"][0].Port) } +func TestEndpointRoutingWithNoEndpointSlices(t *testing.T) { + f := entrypoint.RunFake(t, entrypoint.FakeConfig{EnvoyConfig: true}, nil) + // Create Mapping, Service, and Endpoints resources to start. + assert.NoError(t, f.Upsert(makeMapping("default", "foo", "/foo", "foo", "endpoint"))) + assert.NoError(t, f.Upsert(makeService("default", "foo"))) + subset, err := makeSubset(8080, "1.2.3.4") + require.NoError(t, err) + assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset))) + f.Flush() + snap, err := f.GetSnapshot(HasMapping("default", "foo")) + require.NoError(t, err) + assert.NotNil(t, snap) + + // Check that the endpoints resource we created at the start was properly propagated. + endpoints, err := f.GetEndpoints(HasEndpoints("k8s/default/foo")) + require.NoError(t, err) + assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo"][0].Ip) + assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo"][0].Port) + assert.Contains(t, endpoints.Entries, "k8s/default/foo/80") + assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo/80"][0].Ip) + assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo/80"][0].Port) +} + func TestEndpointRoutingMappingAnnotations(t *testing.T) { f := entrypoint.RunFake(t, entrypoint.FakeConfig{EnvoyConfig: true}, nil) // Create Mapping, Service, and Endpoints resources to start. @@ -242,6 +265,40 @@ func makeService(namespace, name string) *kates.Service { } } +func makeEndpoints(namespace, name string, subsets ...kates.EndpointSubset) *kates.Endpoints { + return &kates.Endpoints{ + TypeMeta: kates.TypeMeta{Kind: "Endpoints"}, + ObjectMeta: kates.ObjectMeta{Namespace: namespace, Name: name}, + Subsets: subsets, + } +} + +// makeSubset provides a convenient way to kubernetes EndpointSubset resources. Any int args are +// ports, any ip address strings are addresses, and no ip address strings are used as the port name +// for any ports that follow them in the arg list. +func makeSubset(args ...interface{}) (kates.EndpointSubset, error) { + portName := "" + var ports []kates.EndpointPort + var addrs []kates.EndpointAddress + for _, arg := range args { + switch v := arg.(type) { + case int: + ports = append(ports, kates.EndpointPort{Name: portName, Port: int32(v), Protocol: kates.ProtocolTCP}) + case string: + IP := net.ParseIP(v) + if IP == nil { + portName = v + } else { + addrs = append(addrs, kates.EndpointAddress{IP: v}) + } + default: + return kates.EndpointSubset{}, fmt.Errorf("unrecognized type: %T", v) + } + } + + return kates.EndpointSubset{Addresses: addrs, Ports: ports}, nil +} + func makeEndpointSlice(namespace, name, serviceName string, endpoint []kates.Endpoint, port []kates.EndpointSlicePort) *kates.EndpointSlice { return &kates.EndpointSlice{ TypeMeta: kates.TypeMeta{Kind: "EndpointSlices", APIVersion: "v1.discovery.k8s.io"}, diff --git a/cmd/entrypoint/endpoints.go b/cmd/entrypoint/endpoints.go index b83330b68c..0a62082de4 100644 --- a/cmd/entrypoint/endpoints.go +++ b/cmd/entrypoint/endpoints.go @@ -231,13 +231,7 @@ func (eri *endpointRoutingInfo) checkMapping(ctx context.Context, mapping *amb.M if eri.resolverTypes[resolver] == KubernetesEndpointResolver { svc, ns, _ := eri.module.parseService(ctx, mapping, service, mapping.GetNamespace()) - for _, endpointSlice := range eri.endpointSlices { - // Check if this EndpointSlice matches the target service and namespace - if endpointSlice.Namespace == ns && endpointSlice.Labels["kubernetes.io/service-name"] == svc { - eri.endpointWatches[fmt.Sprintf("%s:%s", ns, endpointSlice.Name)] = true - - } - } + eri.mapEndpointWatches(ns, svc) } } @@ -256,14 +250,26 @@ func (eri *endpointRoutingInfo) checkTCPMapping(ctx context.Context, tcpmapping if eri.resolverTypes[resolver] == KubernetesEndpointResolver { svc, ns, _ := eri.module.parseService(ctx, tcpmapping, service, tcpmapping.GetNamespace()) - for _, endpointSlice := range eri.endpointSlices { - // Check if this EndpointSlice matches the target service and namespace - if endpointSlice.Namespace == ns && endpointSlice.Labels["kubernetes.io/service-name"] == svc { - eri.endpointWatches[fmt.Sprintf("%s:%s", ns, endpointSlice.Name)] = true + eri.mapEndpointWatches(ns, svc) + } +} +// mapEndpointWatches figures out what service discovery object available for a given service. +func (eri *endpointRoutingInfo) mapEndpointWatches(namespace string, serviceName string) { + foundEndpointSlice := false + for _, endpointSlice := range eri.endpointSlices { + // Check if this EndpointSlice matches the target service and namespace, and has the required label + if endpointSlice.Namespace == namespace { + if service, ok := endpointSlice.Labels["kubernetes.io/service-name"]; ok && service == serviceName { + eri.endpointWatches[fmt.Sprintf("%s:%s", namespace, endpointSlice.Name)] = true + foundEndpointSlice = true } } } + if !foundEndpointSlice { + //Use Endpoint if EndpointSlice doesn't exist + eri.endpointWatches[fmt.Sprintf("%s:%s", namespace, serviceName)] = true + } } func (m *moduleResolver) parseService(ctx context.Context, resource kates.Object, svcName, svcNamespace string) (name string, namespace string, port string) { diff --git a/cmd/entrypoint/watcher.go b/cmd/entrypoint/watcher.go index d4b74fb653..d28796b8df 100644 --- a/cmd/entrypoint/watcher.go +++ b/cmd/entrypoint/watcher.go @@ -492,7 +492,7 @@ func (sh *SnapshotHolder) K8sUpdate( for _, delta := range deltas { sh.unsentDeltas = append(sh.unsentDeltas, delta) - if delta.Kind == "EndpointSlice" { + if delta.Kind == "EndpointSlice" || delta.Kind == "Endpoints" { key := fmt.Sprintf("%s:%s", delta.Namespace, delta.Name) if sh.endpointRoutingInfo.endpointWatches[key] || sh.dispatcher.IsWatched(delta.Namespace, delta.Name) { endpointsChanged = true From ab7b539a4742312559b8e525ef2b70c7e79a0087 Mon Sep 17 00:00:00 2001 From: ajaychoudhary-hotstar Date: Fri, 8 Nov 2024 19:20:27 +0530 Subject: [PATCH 5/7] Added condition to take only Ready pods for load balancing Signed-off-by: ajaychoudhary-hotstar --- cmd/entrypoint/endpoint_routing.go | 24 +++++++++++++----------- cmd/entrypoint/endpoint_routing_test.go | 7 ++++++- pkg/kates/aliases.go | 1 + 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/cmd/entrypoint/endpoint_routing.go b/cmd/entrypoint/endpoint_routing.go index 6839fec3a6..3e0c84ab2e 100644 --- a/cmd/entrypoint/endpoint_routing.go +++ b/cmd/entrypoint/endpoint_routing.go @@ -150,18 +150,20 @@ func k8sEndpointSlicesToAmbex(endpointSlice *kates.EndpointSlice, svc *kates.Ser } } } - for _, address := range endpoint.Addresses { - for pn := range portNames { - sep := "/" - if pn == "" { - sep = "" + if *endpoint.Conditions.Ready { + for _, address := range endpoint.Addresses { + for pn := range portNames { + sep := "/" + if pn == "" { + sep = "" + } + result = append(result, &ambex.Endpoint{ + ClusterName: fmt.Sprintf("k8s/%s/%s%s%s", svc.Namespace, svc.Name, sep, pn), + Ip: address, + Port: uint32(*port.Port), + Protocol: string(*port.Protocol), + }) } - result = append(result, &ambex.Endpoint{ - ClusterName: fmt.Sprintf("k8s/%s/%s%s%s", svc.Namespace, svc.Name, sep, pn), - Ip: address, - Port: uint32(*port.Port), - Protocol: string(*port.Protocol), - }) } } } diff --git a/cmd/entrypoint/endpoint_routing_test.go b/cmd/entrypoint/endpoint_routing_test.go index f132b541c2..a2d47a3025 100644 --- a/cmd/entrypoint/endpoint_routing_test.go +++ b/cmd/entrypoint/endpoint_routing_test.go @@ -120,7 +120,7 @@ func TestEndpointRoutingMultiplePorts(t *testing.T) { }, }, })) - endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4") + endpoint, port, err := makeSliceEndpoint("cleartext", 8080, "encrypted", 8443, "1.2.3.4") require.NoError(t, err) assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port))) f.Flush() @@ -328,6 +328,11 @@ func makeSliceEndpoint(args ...interface{}) ([]kates.Endpoint, []kates.EndpointS if IP != nil { endpoints = append(endpoints, kates.Endpoint{ Addresses: []string{v}, + Conditions: kates.EndpointConditions{ + Ready: &[]bool{true}[0], + Serving: &[]bool{true}[0], + Terminating: &[]bool{false}[0], + }, }) } else { portName = v // Assume it's a port name if not an IP address diff --git a/pkg/kates/aliases.go b/pkg/kates/aliases.go index 8da276c97c..6c18031204 100644 --- a/pkg/kates/aliases.go +++ b/pkg/kates/aliases.go @@ -71,6 +71,7 @@ type EndpointPort = corev1.EndpointPort type EndpointSlice = discoveryv1.EndpointSlice type Endpoint = discoveryv1.Endpoint type EndpointSlicePort = discoveryv1.EndpointPort +type EndpointConditions = discoveryv1.EndpointConditions type Protocol = corev1.Protocol From f8829ee1d86fbf22be3269adf61628a027780038 Mon Sep 17 00:00:00 2001 From: ajaychoudhary-hotstar Date: Thu, 21 Nov 2024 14:35:45 +0530 Subject: [PATCH 6/7] Fixed test case Signed-off-by: ajaychoudhary-hotstar --- cmd/entrypoint/endpoint_routing_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/entrypoint/endpoint_routing_test.go b/cmd/entrypoint/endpoint_routing_test.go index a2d47a3025..ebb6b7ba79 100644 --- a/cmd/entrypoint/endpoint_routing_test.go +++ b/cmd/entrypoint/endpoint_routing_test.go @@ -317,11 +317,12 @@ func makeEndpointSlice(namespace, name, serviceName string, endpoint []kates.End func makeSliceEndpoint(args ...interface{}) ([]kates.Endpoint, []kates.EndpointSlicePort, error) { var endpoints []kates.Endpoint var ports []kates.EndpointSlicePort - portName := "" + var currentPortName string for _, arg := range args { switch v := arg.(type) { case int: + portName := currentPortName ports = append(ports, kates.EndpointSlicePort{Name: &portName, Port: int32Ptr(int32(v)), Protocol: protocolPtr(kates.ProtocolTCP)}) case string: IP := net.ParseIP(v) @@ -335,7 +336,7 @@ func makeSliceEndpoint(args ...interface{}) ([]kates.Endpoint, []kates.EndpointS }, }) } else { - portName = v // Assume it's a port name if not an IP address + currentPortName = v // Assume it's a port name if not an IP address } default: return nil, nil, fmt.Errorf("unrecognized type: %T", v) From f30725562c0e0ccb3e0d24507c57a0b7f8091179 Mon Sep 17 00:00:00 2001 From: Flynn Date: Mon, 25 Nov 2024 23:45:19 -0500 Subject: [PATCH 7/7] Clean up releaseNotes/CHANGELOG and 'make generate' Signed-off-by: Flynn --- CHANGELOG.md | 3 ++- docs/releaseNotes.yml | 7 +++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 01864b3311..773ff3721d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -107,7 +107,8 @@ it will be removed; but as it won't be user-visible this isn't considered a brea instead of the Mapping name, which could reduce the cache's effectiveness. This has been fixed so that the correct key is used. ([Incorrect Cache Key for Mapping]) -- Change: Updated Emissary-Ingress to use EndpointSlices instead of Endpoints to support more than 1000 Backends +- Feature: Emissary-ingress now supports resolving Endpoints from EndpointSlices in addition to the + existing support for Endpoints, supporting Services with more than 1000 endpoints. [Incorrect Cache Key for Mapping]: https://github.com/emissary-ingress/emissary/issues/5714 diff --git a/docs/releaseNotes.yml b/docs/releaseNotes.yml index e352a30eb0..281513dcfb 100644 --- a/docs/releaseNotes.yml +++ b/docs/releaseNotes.yml @@ -68,6 +68,13 @@ items: - title: "Incorrect Cache Key for Mapping" link: https://github.com/emissary-ingress/emissary/issues/5714 + - title: Add support for EndpointSlices to the Endpoints resolver + type: feature + body: >- + $productName$ now supports resolving Endpoints from EndpointSlices + in addition to the existing support for Endpoints, supporting Services + with more than 1000 endpoints. + - version: 3.9.0 prevVersion: 3.8.0 date: '2023-11-13'