Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: switch from endpoints to endpointslices #8890

Merged
merged 7 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions charts/ingress-nginx/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ rules:
- get
- list
- watch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- list
- watch
- get
{{- end }}

{{- end }}
8 changes: 8 additions & 0 deletions charts/ingress-nginx/templates/controller-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ rules:
verbs:
- create
- patch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- list
- watch
- get
{{- if .Values.podSecurityPolicy.enabled }}
- apiGroups: [{{ template "podSecurityPolicy.apiGroup" . }}]
resources: ['podsecuritypolicies']
Expand Down
13 changes: 6 additions & 7 deletions internal/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
sp := svc.Spec.Ports[i]
if sp.Name == svcPort {
if sp.Protocol == proto {
endps = getEndpoints(svc, &sp, proto, n.store.GetServiceEndpoints)
endps = getEndpointsFromSlices(svc, &sp, proto, n.store.GetServiceEndpointsSlices)
break
}
}
Expand All @@ -444,7 +444,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
sp := svc.Spec.Ports[i]
if sp.Port == int32(targetPort) {
if sp.Protocol == proto {
endps = getEndpoints(svc, &sp, proto, n.store.GetServiceEndpoints)
endps = getEndpointsFromSlices(svc, &sp, proto, n.store.GetServiceEndpointsSlices)
break
}
}
Expand Down Expand Up @@ -496,7 +496,7 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
return upstream
}

endps := getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, n.store.GetServiceEndpoints)
endps := getEndpointsFromSlices(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices)
if len(endps) == 0 {
klog.Warningf("Service %q does not have any active Endpoint", svcKey)
endps = []ingress.Endpoint{n.DefaultEndpoint()}
Expand Down Expand Up @@ -823,7 +823,7 @@ func (n *NGINXController) getBackendServers(ingresses []*ingress.Ingress) ([]*in
}

sp := location.DefaultBackend.Spec.Ports[0]
endps := getEndpoints(location.DefaultBackend, &sp, apiv1.ProtocolTCP, n.store.GetServiceEndpoints)
endps := getEndpointsFromSlices(location.DefaultBackend, &sp, apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices)
// custom backend is valid only if contains at least one endpoint
if len(endps) > 0 {
name := fmt.Sprintf("custom-default-backend-%v-%v", location.DefaultBackend.GetNamespace(), location.DefaultBackend.GetName())
Expand Down Expand Up @@ -1081,15 +1081,14 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres
}

klog.V(3).Infof("Obtaining ports information for Service %q", svcKey)

// Ingress with an ExternalName Service and no port defined for that Service
if svc.Spec.Type == apiv1.ServiceTypeExternalName {
if n.cfg.DisableServiceExternalName {
klog.Warningf("Service %q of type ExternalName not allowed due to Ingress configuration.", svcKey)
return upstreams, nil
}
servicePort := externalNamePorts(backendPort, svc)
endps := getEndpoints(svc, servicePort, apiv1.ProtocolTCP, n.store.GetServiceEndpoints)
endps := getEndpointsFromSlices(svc, servicePort, apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices)
if len(endps) == 0 {
klog.Warningf("Service %q does not have any active Endpoint.", svcKey)
return upstreams, nil
Expand All @@ -1106,7 +1105,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres
servicePort.TargetPort.String() == backendPort ||
servicePort.Name == backendPort {

endps := getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, n.store.GetServiceEndpoints)
endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices)
if len(endps) == 0 {
klog.Warningf("Service %q does not have any active Endpoint.", svcKey)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/ingress/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/eapache/channels"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networking "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -85,7 +86,7 @@ func (fakeIngressStore) GetService(key string) (*corev1.Service, error) {
return nil, fmt.Errorf("test error")
}

func (fakeIngressStore) GetServiceEndpoints(key string) (*corev1.Endpoints, error) {
func (fakeIngressStore) GetServiceEndpointsSlices(key string) ([]*discoveryv1.EndpointSlice, error) {
return nil, fmt.Errorf("test error")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ import (
"k8s.io/klog/v2"

corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"

"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/pkg/apis/ingress"
)

// getEndpoints returns a list of Endpoint structs for a given service/target port combination.
func getEndpoints(s *corev1.Service, port *corev1.ServicePort, proto corev1.Protocol,
getServiceEndpoints func(string) (*corev1.Endpoints, error)) []ingress.Endpoint {
func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto corev1.Protocol,
getServiceEndpointsSlices func(string) ([]*discoveryv1.EndpointSlice, error)) []ingress.Endpoint {

upsServers := []ingress.Endpoint{}

Expand Down Expand Up @@ -74,54 +75,63 @@ func getEndpoints(s *corev1.Service, port *corev1.ServicePort, proto corev1.Prot
})
}

klog.V(3).Infof("Getting Endpoints for Service %q and port %v", svcKey, port.String())
ep, err := getServiceEndpoints(svcKey)
klog.V(3).Infof("Getting Endpoints from endpointSlices for Service %q and port %v", svcKey, port.String())
epss, err := getServiceEndpointsSlices(svcKey)
if err != nil {
klog.Warningf("Error obtaining Endpoints for Service %q: %v", svcKey, err)
return upsServers
}

for _, ss := range ep.Subsets {
matchedPortNameFound := false
for i, epPort := range ss.Ports {

if !reflect.DeepEqual(epPort.Protocol, proto) {
continue
}

var targetPort int32

if port.Name == "" {
// port.Name is optional if there is only one port
targetPort = epPort.Port
matchedPortNameFound = true
} else if port.Name == epPort.Name {
targetPort = epPort.Port
matchedPortNameFound = true
}

if i == len(ss.Ports)-1 && !matchedPortNameFound && port.TargetPort.Type == intstr.Int {
// use service target port if it's a number and no port name matched
// https://github.com/kubernetes/ingress-nginx/issues/7390
targetPort = port.TargetPort.IntVal
// loop over all endpointSlices generated for service
for _, eps := range epss {
var ports []int32
if len(eps.Ports) == 0 {
// When ports is empty, it indicates that there are no defined ports, using svc targePort <- this could be wrong
klog.V(3).Infof("No ports found on endpointSlice, using service TargetPort %v for Service %q", port.String(), svcKey)
ports = append(ports, port.TargetPort.IntVal)
} else {
for _, epPort := range eps.Ports {
if !reflect.DeepEqual(*epPort.Protocol, proto) {
continue
}
var targetPort int32 = 0
if port.Name == "" {
// port.Name is optional if there is only one port
targetPort = *epPort.Port
} else if port.Name == *epPort.Name {
targetPort = *epPort.Port
}
if targetPort == 0 && port.TargetPort.Type == intstr.Int {
// use service target port if it's a number and no port name matched
// https://github.com/kubernetes/ingress-nginx/issues/7390
targetPort = port.TargetPort.IntVal
}
if targetPort == 0 {
continue
}
ports = append(ports, targetPort)
}

if targetPort <= 0 {
}
for _, ep := range eps.Endpoints {
if !(*ep.Conditions.Ready) {
continue
}

for _, epAddress := range ss.Addresses {
ep := net.JoinHostPort(epAddress.IP, strconv.Itoa(int(targetPort)))
if _, exists := processedUpstreamServers[ep]; exists {
continue
}
ups := ingress.Endpoint{
Address: epAddress.IP,
Port: fmt.Sprintf("%v", targetPort),
Target: epAddress.TargetRef,
// ep.Hints

for _, epPort := range ports {
for _, epAddress := range ep.Addresses {
hostPort := net.JoinHostPort(epAddress, strconv.Itoa(int(epPort)))
if _, exists := processedUpstreamServers[hostPort]; exists {
continue
}
ups := ingress.Endpoint{
Address: epAddress,
Port: fmt.Sprintf("%v", epPort),
Target: ep.TargetRef,
}
upsServers = append(upsServers, ups)
processedUpstreamServers[hostPort] = struct{}{}
}
upsServers = append(upsServers, ups)
processedUpstreamServers[ep] = struct{}{}
}
}
}
Expand Down
Loading