Skip to content

Commit

Permalink
Merge pull request #5808 from emissary-ingress/flynn/pr5798
Browse files Browse the repository at this point in the history
Include PR5798
  • Loading branch information
kflynn authored Dec 9, 2024
2 parents 20e3f63 + f307255 commit c2cd9dd
Show file tree
Hide file tree
Showing 13 changed files with 293 additions and 94 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ it will be removed; but as it won't be user-visible this isn't considered a brea
instead of the Mapping name, which could reduce the cache's effectiveness. This has been fixed so
that the correct key is used. ([Incorrect Cache Key for Mapping])

- Feature: Emissary-ingress now supports resolving Endpoints from EndpointSlices in addition to the
existing support for Endpoints, supporting Services with more than 1000 endpoints.

[Incorrect Cache Key for Mapping]: https://github.com/emissary-ingress/emissary/issues/5714

## [3.9.0] November 13, 2023
Expand Down
88 changes: 82 additions & 6 deletions cmd/entrypoint/endpoint_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,33 @@ func makeEndpoints(ctx context.Context, ksnap *snapshot.KubernetesSnapshot, cons

result := map[string][]*ambex.Endpoint{}

for _, k8sEp := range ksnap.Endpoints {
svc, ok := k8sServices[key(k8sEp)]
if !ok {
continue
svcEndpointSlices := map[string][]*kates.EndpointSlice{}

// Collect all the EndpointSlices for each service if the "kubernetes.io/service-name" label is present
for _, k8sEndpointSlice := range ksnap.EndpointSlices {
if serviceName, labelExists := k8sEndpointSlice.Labels["kubernetes.io/service-name"]; labelExists {
svcKey := fmt.Sprintf("%s:%s", k8sEndpointSlice.Namespace, serviceName)
svcEndpointSlices[svcKey] = append(svcEndpointSlices[svcKey], k8sEndpointSlice)
}
for _, ep := range k8sEndpointsToAmbex(k8sEp, svc) {
result[ep.ClusterName] = append(result[ep.ClusterName], ep)
}

// Map each service to its corresponding endpoints from all its EndpointSlices, or fall back to Endpoints if needed
for svcKey, svc := range k8sServices {
if slices, ok := svcEndpointSlices[svcKey]; ok && len(slices) > 0 {
for _, slice := range slices {
for _, ep := range k8sEndpointSlicesToAmbex(slice, svc) {
result[ep.ClusterName] = append(result[ep.ClusterName], ep)
}
}
} else {
// Fallback to using Endpoints if no valid EndpointSlices are available
for _, k8sEp := range ksnap.Endpoints {
if key(k8sEp) == svcKey {
for _, ep := range k8sEndpointsToAmbex(k8sEp, svc) {
result[ep.ClusterName] = append(result[ep.ClusterName], ep)
}
}
}
}
}

Expand Down Expand Up @@ -97,6 +117,62 @@ func k8sEndpointsToAmbex(ep *kates.Endpoints, svc *kates.Service) (result []*amb
return
}

func k8sEndpointSlicesToAmbex(endpointSlice *kates.EndpointSlice, svc *kates.Service) (result []*ambex.Endpoint) {
portmap := map[string][]string{}
for _, p := range svc.Spec.Ports {
port := fmt.Sprintf("%d", p.Port)
targetPort := p.TargetPort.String()
if targetPort == "" {
targetPort = fmt.Sprintf("%d", p.Port)
}

portmap[targetPort] = append(portmap[targetPort], port)
if p.Name != "" {
portmap[targetPort] = append(portmap[targetPort], p.Name)
portmap[p.Name] = append(portmap[p.Name], port)
}
if len(svc.Spec.Ports) == 1 {
portmap[targetPort] = append(portmap[targetPort], "")
portmap[""] = append(portmap[""], port)
portmap[""] = append(portmap[""], "")
}
}

for _, endpoint := range endpointSlice.Endpoints {
for _, port := range endpointSlice.Ports {
if *port.Protocol == kates.ProtocolTCP || *port.Protocol == kates.ProtocolUDP {
portNames := map[string]bool{}
candidates := []string{fmt.Sprintf("%d", *port.Port), *port.Name, ""}
for _, c := range candidates {
if pns, ok := portmap[c]; ok {
for _, pn := range pns {
portNames[pn] = true
}
}
}
if *endpoint.Conditions.Ready {
for _, address := range endpoint.Addresses {
for pn := range portNames {
sep := "/"
if pn == "" {
sep = ""
}
result = append(result, &ambex.Endpoint{
ClusterName: fmt.Sprintf("k8s/%s/%s%s%s", svc.Namespace, svc.Name, sep, pn),
Ip: address,
Port: uint32(*port.Port),
Protocol: string(*port.Protocol),
})
}
}
}
}
}
}

return
}

func consulEndpointsToAmbex(ctx context.Context, endpoints consulwatch.Endpoints) (result []*ambex.Endpoint) {
for _, ep := range endpoints.Endpoints {
addrs, err := net.LookupHost(ep.Address)
Expand Down
90 changes: 84 additions & 6 deletions cmd/entrypoint/endpoint_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,29 @@ import (
)

func TestEndpointRouting(t *testing.T) {
f := entrypoint.RunFake(t, entrypoint.FakeConfig{EnvoyConfig: true}, nil)
// Create Mapping, Service, and Endpoints resources to start.
assert.NoError(t, f.Upsert(makeMapping("default", "foo", "/foo", "foo", "endpoint")))
assert.NoError(t, f.Upsert(makeService("default", "foo")))
endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4")
require.NoError(t, err)
assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port)))
f.Flush()
snap, err := f.GetSnapshot(HasMapping("default", "foo"))
require.NoError(t, err)
assert.NotNil(t, snap)

// Check that the endpoints resource we created at the start was properly propagated.
endpoints, err := f.GetEndpoints(HasEndpoints("k8s/default/foo"))
require.NoError(t, err)
assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo"][0].Ip)
assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo"][0].Port)
assert.Contains(t, endpoints.Entries, "k8s/default/foo/80")
assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo/80"][0].Ip)
assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo/80"][0].Port)
}

func TestEndpointRoutingWithNoEndpointSlices(t *testing.T) {
f := entrypoint.RunFake(t, entrypoint.FakeConfig{EnvoyConfig: true}, nil)
// Create Mapping, Service, and Endpoints resources to start.
assert.NoError(t, f.Upsert(makeMapping("default", "foo", "/foo", "foo", "endpoint")))
Expand Down Expand Up @@ -57,9 +80,9 @@ service: foo
resolver: endpoint`,
}
assert.NoError(t, f.Upsert(svc))
subset, err := makeSubset(8080, "1.2.3.4")
endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4")
require.NoError(t, err)
assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port)))
f.Flush()
snap, err := f.GetSnapshot(HasService("default", "foo"))
require.NoError(t, err)
Expand Down Expand Up @@ -97,9 +120,9 @@ func TestEndpointRoutingMultiplePorts(t *testing.T) {
},
},
}))
subset, err := makeSubset("cleartext", 8080, "encrypted", 8443, "1.2.3.4")
endpoint, port, err := makeSliceEndpoint("cleartext", 8080, "encrypted", 8443, "1.2.3.4")
require.NoError(t, err)
assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port)))
f.Flush()
snap, err := f.GetSnapshot(HasMapping("default", "foo"))
require.NoError(t, err)
Expand Down Expand Up @@ -155,9 +178,9 @@ func TestEndpointRoutingIP(t *testing.T) {
func TestEndpointRoutingMappingCreation(t *testing.T) {
f := entrypoint.RunFake(t, entrypoint.FakeConfig{}, nil)
assert.NoError(t, f.Upsert(makeService("default", "foo")))
subset, err := makeSubset(8080, "1.2.3.4")
endpoint, port, err := makeSliceEndpoint(8080, "1.2.3.4")
require.NoError(t, err)
assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
assert.NoError(t, f.Upsert(makeEndpointSlice("default", "foo", "foo", endpoint, port)))
f.Flush()
f.AssertEndpointsEmpty(timeout)
assert.NoError(t, f.UpsertYAML(`
Expand Down Expand Up @@ -275,3 +298,58 @@ func makeSubset(args ...interface{}) (kates.EndpointSubset, error) {

return kates.EndpointSubset{Addresses: addrs, Ports: ports}, nil
}

func makeEndpointSlice(namespace, name, serviceName string, endpoint []kates.Endpoint, port []kates.EndpointSlicePort) *kates.EndpointSlice {
return &kates.EndpointSlice{
TypeMeta: kates.TypeMeta{Kind: "EndpointSlices", APIVersion: "v1.discovery.k8s.io"},
ObjectMeta: kates.ObjectMeta{
Namespace: namespace,
Name: name,
Labels: map[string]string{
"kubernetes.io/service-name": serviceName,
},
},
Endpoints: endpoint,
Ports: port,
}
}

func makeSliceEndpoint(args ...interface{}) ([]kates.Endpoint, []kates.EndpointSlicePort, error) {
var endpoints []kates.Endpoint
var ports []kates.EndpointSlicePort
var currentPortName string

for _, arg := range args {
switch v := arg.(type) {
case int:
portName := currentPortName
ports = append(ports, kates.EndpointSlicePort{Name: &portName, Port: int32Ptr(int32(v)), Protocol: protocolPtr(kates.ProtocolTCP)})
case string:
IP := net.ParseIP(v)
if IP != nil {
endpoints = append(endpoints, kates.Endpoint{
Addresses: []string{v},
Conditions: kates.EndpointConditions{
Ready: &[]bool{true}[0],
Serving: &[]bool{true}[0],
Terminating: &[]bool{false}[0],
},
})
} else {
currentPortName = v // Assume it's a port name if not an IP address
}
default:
return nil, nil, fmt.Errorf("unrecognized type: %T", v)
}
}

return endpoints, ports, nil
}

func int32Ptr(i int32) *int32 {
return &i
}

func protocolPtr(p kates.Protocol) *kates.Protocol {
return &p
}
27 changes: 24 additions & 3 deletions cmd/entrypoint/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type endpointRoutingInfo struct {
module moduleResolver
endpointWatches map[string]bool // A set to track the subset of kubernetes endpoints we care about.
previousWatches map[string]bool
endpointSlices []*kates.EndpointSlice
}

type ResolverType int
Expand All @@ -47,7 +48,7 @@ func (rt ResolverType) String() string {

// newEndpointRoutingInfo creates a shiny new struct to hold information about
// resolvers in use and such.
func newEndpointRoutingInfo() endpointRoutingInfo {
func newEndpointRoutingInfo(endpointSlices []*kates.EndpointSlice) endpointRoutingInfo {
return endpointRoutingInfo{
// resolverTypes keeps track of the type of every resolver in the system.
// It starts out empty.
Expand All @@ -59,6 +60,7 @@ func newEndpointRoutingInfo() endpointRoutingInfo {
resolverTypes: make(map[string]ResolverType),
// Track which endpoints we actually want to watch.
endpointWatches: make(map[string]bool),
endpointSlices: endpointSlices,
}
}

Expand All @@ -71,6 +73,7 @@ func (eri *endpointRoutingInfo) reconcileEndpointWatches(ctx context.Context, s
eri.module = moduleResolver{}
eri.previousWatches = eri.endpointWatches
eri.endpointWatches = map[string]bool{}
eri.endpointSlices = s.EndpointSlices

// Phase one processes all the configuration stuff that Mappings depend on. Right now this
// includes Modules and Resolvers. When we are done with Phase one we have processed enough
Expand Down Expand Up @@ -228,7 +231,7 @@ func (eri *endpointRoutingInfo) checkMapping(ctx context.Context, mapping *amb.M

if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
svc, ns, _ := eri.module.parseService(ctx, mapping, service, mapping.GetNamespace())
eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
eri.mapEndpointWatches(ns, svc)
}
}

Expand All @@ -247,7 +250,25 @@ func (eri *endpointRoutingInfo) checkTCPMapping(ctx context.Context, tcpmapping

if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
svc, ns, _ := eri.module.parseService(ctx, tcpmapping, service, tcpmapping.GetNamespace())
eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
eri.mapEndpointWatches(ns, svc)
}
}

// mapEndpointWatches figures out what service discovery object available for a given service.
func (eri *endpointRoutingInfo) mapEndpointWatches(namespace string, serviceName string) {
foundEndpointSlice := false
for _, endpointSlice := range eri.endpointSlices {
// Check if this EndpointSlice matches the target service and namespace, and has the required label
if endpointSlice.Namespace == namespace {
if service, ok := endpointSlice.Labels["kubernetes.io/service-name"]; ok && service == serviceName {
eri.endpointWatches[fmt.Sprintf("%s:%s", namespace, endpointSlice.Name)] = true
foundEndpointSlice = true
}
}
}
if !foundEndpointSlice {
//Use Endpoint if EndpointSlice doesn't exist
eri.endpointWatches[fmt.Sprintf("%s:%s", namespace, serviceName)] = true
}
}

Expand Down
9 changes: 5 additions & 4 deletions cmd/entrypoint/interesting_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ func GetInterestingTypes(ctx context.Context, serverTypeList []kates.APIResource
//
// Note that we pull `secrets.v1.` in to "K8sSecrets". ReconcileSecrets will pull
// over the ones we need into "Secrets" and "Endpoints" respectively.
"Services": {{typename: "services.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"Endpoints": {{typename: "endpoints.v1.", fieldselector: endpointFs}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"K8sSecrets": {{typename: "secrets.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"ConfigMaps": {{typename: "configmaps.v1.", fieldselector: configMapFs}},
"Services": {{typename: "services.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"Endpoints": {{typename: "endpoints.v1.", fieldselector: endpointFs}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"EndpointSlices": {{typename: "endpointslices.v1.discovery.k8s.io", fieldselector: endpointFs}},
"K8sSecrets": {{typename: "secrets.v1."}}, // New in Kubernetes 0.16.0 (2015-04-28) (v1beta{1..3} before that)
"ConfigMaps": {{typename: "configmaps.v1.", fieldselector: configMapFs}},
"Ingresses": {
{typename: "ingresses.v1beta1.extensions"}, // New in Kubernetes 1.2.0 (2016-03-16), gone in Kubernetes 1.22.0 (2021-08-04)
{typename: "ingresses.v1beta1.networking.k8s.io"}, // New in Kubernetes 1.14.0 (2019-03-25), gone in Kubernetes 1.22.0 (2021-08-04)
Expand Down
Loading

0 comments on commit c2cd9dd

Please sign in to comment.