Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't reload when use-cluster-ip endpoints update, and change the ingress use-cluster-ip implementation to use the cluster ip instead of the fqdn #5318

Merged
merged 8 commits into from
Apr 5, 2024
27 changes: 7 additions & 20 deletions internal/configs/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,34 +534,21 @@ func createUpstream(ingEx *IngressEx, name string, backend *networking.IngressBa
endps = []string{}
}

if cfg.UseClusterIP {
fqdn := fmt.Sprintf("%s.%s.svc.cluster.local:%d", backend.Service.Name, ingEx.Ingress.Namespace, backend.Service.Port.Number)
for _, endp := range endps {
upsServers = append(upsServers, version1.UpstreamServer{
Address: fqdn,
Address: endp,
MaxFails: cfg.MaxFails,
MaxConns: cfg.MaxConns,
FailTimeout: cfg.FailTimeout,
SlowStart: cfg.SlowStart,
Resolve: isExternalNameSvc,
})
}
if len(upsServers) > 0 {
sort.Slice(upsServers, func(i, j int) bool {
return upsServers[i].Address < upsServers[j].Address
})
ups.UpstreamServers = upsServers
} else {
for _, endp := range endps {
upsServers = append(upsServers, version1.UpstreamServer{
Address: endp,
MaxFails: cfg.MaxFails,
MaxConns: cfg.MaxConns,
FailTimeout: cfg.FailTimeout,
SlowStart: cfg.SlowStart,
Resolve: isExternalNameSvc,
})
}
if len(upsServers) > 0 {
sort.Slice(upsServers, func(i, j int) bool {
return upsServers[i].Address < upsServers[j].Address
})
ups.UpstreamServers = upsServers
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions internal/configs/ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func createExpectedConfigForMergeableCafeIngressWithUseClusterIP() version1.Ingr
UpstreamZoneSize: upstreamZoneSize,
UpstreamServers: []version1.UpstreamServer{
{
Address: "coffee-svc.default.svc.cluster.local:80",
Address: "10.0.0.1:80",
MaxFails: 1,
MaxConns: 0,
FailTimeout: "10s",
Expand Down Expand Up @@ -803,7 +803,7 @@ func createExpectedConfigForCafeIngressWithUseClusterIP() version1.IngressNginxC
UpstreamZoneSize: upstreamZoneSize,
UpstreamServers: []version1.UpstreamServer{
{
Address: "coffee-svc.default.svc.cluster.local:80",
Address: "10.0.0.1:80",
MaxFails: 1,
MaxConns: 0,
FailTimeout: "10s",
Expand All @@ -817,7 +817,7 @@ func createExpectedConfigForCafeIngressWithUseClusterIP() version1.IngressNginxC
UpstreamZoneSize: upstreamZoneSize,
UpstreamServers: []version1.UpstreamServer{
{
Address: "tea-svc.default.svc.cluster.local:80",
Address: "10.0.0.2:80",
MaxFails: 1,
MaxConns: 0,
FailTimeout: "10s",
Expand Down
119 changes: 101 additions & 18 deletions internal/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,35 +842,51 @@ func (lbc *LoadBalancerController) syncEndpointSlices(task task) bool {
}

endpointSlice := obj.(*discovery_v1.EndpointSlice)
svcResource := lbc.configuration.FindResourcesForService(endpointSlice.Namespace, endpointSlice.Labels["kubernetes.io/service-name"])
svcName := endpointSlice.Labels["kubernetes.io/service-name"]
svcResource := lbc.configuration.FindResourcesForService(endpointSlice.Namespace, svcName)

resourceExes := lbc.createExtendedResources(svcResource)

if len(resourceExes.IngressExes) > 0 {
resourcesFound = true
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.IngressExes)
err = lbc.configurator.UpdateEndpoints(resourceExes.IngressExes)
if err != nil {
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.IngressExes, err)
for _, ingEx := range resourceExes.IngressExes {
if lbc.ingressRequiresEndpointsUpdate(ingEx, svcName) {
resourcesFound = true
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.IngressExes)
err = lbc.configurator.UpdateEndpoints(resourceExes.IngressExes)
if err != nil {
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.IngressExes, err)
}
break
}
}
}

if len(resourceExes.MergeableIngresses) > 0 {
resourcesFound = true
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.MergeableIngresses)
err = lbc.configurator.UpdateEndpointsMergeableIngress(resourceExes.MergeableIngresses)
if err != nil {
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.MergeableIngresses, err)
for _, mergeableIngresses := range resourceExes.MergeableIngresses {
if lbc.mergeableIngressRequiresEndpointsUpdate(mergeableIngresses, svcName) {
resourcesFound = true
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.MergeableIngresses)
err = lbc.configurator.UpdateEndpointsMergeableIngress(resourceExes.MergeableIngresses)
if err != nil {
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.MergeableIngresses, err)
}
break
}
}
}

if lbc.areCustomResourcesEnabled {
if len(resourceExes.VirtualServerExes) > 0 {
resourcesFound = true
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.VirtualServerExes)
err := lbc.configurator.UpdateEndpointsForVirtualServers(resourceExes.VirtualServerExes)
if err != nil {
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.VirtualServerExes, err)
for _, vsEx := range resourceExes.VirtualServerExes {
if lbc.virtualServerRequiresEndpointsUpdate(vsEx, svcName) {
resourcesFound = true
glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.VirtualServerExes)
err := lbc.configurator.UpdateEndpointsForVirtualServers(resourceExes.VirtualServerExes)
if err != nil {
glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.VirtualServerExes, err)
}
break
}
}
}

Expand All @@ -886,6 +902,63 @@ func (lbc *LoadBalancerController) syncEndpointSlices(task task) bool {
return resourcesFound
}

func (lbc *LoadBalancerController) virtualServerRequiresEndpointsUpdate(vsEx *configs.VirtualServerEx, serviceName string) bool {
for _, upstream := range vsEx.VirtualServer.Spec.Upstreams {
if upstream.Service == serviceName && !upstream.UseClusterIP {
return true
}
}

for _, vsr := range vsEx.VirtualServerRoutes {
for _, upstream := range vsr.Spec.Upstreams {
if upstream.Service == serviceName && !upstream.UseClusterIP {
return true
}
}
}

return false
}

func (lbc *LoadBalancerController) ingressRequiresEndpointsUpdate(ingressEx *configs.IngressEx, serviceName string) bool {
hasUseClusterIPAnnotation := ingressEx.Ingress.Annotations[useClusterIPAnnotation] == "true"

for _, rule := range ingressEx.Ingress.Spec.Rules {
if http := rule.HTTP; http != nil {
for _, path := range http.Paths {
if path.Backend.Service != nil && path.Backend.Service.Name == serviceName {
if !hasUseClusterIPAnnotation {
return true
}
}
}
}
}

if http := ingressEx.Ingress.Spec.DefaultBackend; http != nil {
if http.Service != nil && http.Service.Name == serviceName {
if !hasUseClusterIPAnnotation {
return true
}
}
}

return false
}

func (lbc *LoadBalancerController) mergeableIngressRequiresEndpointsUpdate(mergeableIngresses *configs.MergeableIngresses, serviceName string) bool {
masterIngress := mergeableIngresses.Master
minions := mergeableIngresses.Minions

for _, minion := range minions {
if lbc.ingressRequiresEndpointsUpdate(minion, serviceName) {
return true
}
}

return lbc.ingressRequiresEndpointsUpdate(masterIngress, serviceName)
}

func (lbc *LoadBalancerController) createExtendedResources(resources []Resource) configs.ExtendedResources {
var result configs.ExtendedResources

Expand Down Expand Up @@ -2793,6 +2866,7 @@ func (lbc *LoadBalancerController) createMergeableIngresses(ingConfig *IngressCo
}

func (lbc *LoadBalancerController) createIngressEx(ing *networking.Ingress, validHosts map[string]bool, validMinionPaths map[string]bool) *configs.IngressEx {
var endps []string
ingEx := &configs.IngressEx{
Ingress: ing,
ValidHosts: validHosts,
Expand Down Expand Up @@ -2874,6 +2948,7 @@ func (lbc *LoadBalancerController) createIngressEx(ing *networking.Ingress, vali
ingEx.HealthChecks = make(map[string]*api_v1.Probe)
ingEx.ExternalNameSvcs = make(map[string]bool)
ingEx.PodsByIP = make(map[string]configs.PodInfo)
hasUseClusterIP := ingEx.Ingress.Annotations[configs.UseClusterIPAnnotation] == "true"

if ing.Spec.DefaultBackend != nil {
podEndps := []podEndpoint{}
Expand All @@ -2892,7 +2967,11 @@ func (lbc *LoadBalancerController) createIngressEx(ing *networking.Ingress, vali
glog.Warningf("Error retrieving endpoints for the service %v: %v", ing.Spec.DefaultBackend.Service.Name, err)
}

endps := getIPAddressesFromEndpoints(podEndps)
if svc != nil && !external && hasUseClusterIP {
endps = []string{ipv6SafeAddrPort(svc.Spec.ClusterIP, ing.Spec.DefaultBackend.Service.Port.Number)}
} else {
endps = getIPAddressesFromEndpoints(podEndps)
}

// endps is empty if there was any error before this point
ingEx.Endpoints[ing.Spec.DefaultBackend.Service.Name+configs.GetBackendPortAsString(ing.Spec.DefaultBackend.Service.Port)] = endps
Expand Down Expand Up @@ -2948,7 +3027,11 @@ func (lbc *LoadBalancerController) createIngressEx(ing *networking.Ingress, vali
glog.Warningf("Error retrieving endpoints for the service %v: %v", path.Backend.Service.Name, err)
}

endps := getIPAddressesFromEndpoints(podEndps)
if svc != nil && !external && hasUseClusterIP {
endps = []string{ipv6SafeAddrPort(svc.Spec.ClusterIP, path.Backend.Service.Port.Number)}
} else {
endps = getIPAddressesFromEndpoints(podEndps)
}

// endps is empty if there was any error before this point
ingEx.Endpoints[path.Backend.Service.Name+configs.GetBackendPortAsString(path.Backend.Service.Port)] = endps
Expand Down
20 changes: 20 additions & 0 deletions tests/data/use-cluster-ip/ingress/mergeable/minion-ingress.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: use-cluster-ip-ingress-minion
annotations:
nginx.org/use-cluster-ip: "true"
nginx.org/mergeable-ingress-type: "minion"
spec:
ingressClassName: nginx
rules:
- host: use-cluster-ip.example.com
http:
paths:
- path: /backend1
pathType: Prefix
backend:
service:
name: backend1-svc
port:
number: 80
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
annotations:
nginx.org/mergeable-ingress-type: "master"
name: use-cluster-ip-ingress-master
spec:
ingressClassName: nginx
rules:
- host: use-cluster-ip.example.com
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
annotations:
nginx.org/use-cluster-ip: "true"
name: use-cluster-ip-ingress
spec:
ingressClassName: nginx
rules:
- host: use-cluster-ip.example.com
http:
paths:
- path: /backend1
pathType: Prefix
backend:
service:
name: backend1-svc
port:
number: 80
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apiVersion: k8s.nginx.org/v1
kind: VirtualServer
metadata:
name: virtual-server
spec:
host: virtual-server.example.com
upstreams:
- name: backend1
service: backend1-svc
port: 80
use-cluster-ip: true
- name: backend2
service: backend2-svc
port: 80
routes:
- path: "/backend1"
action:
pass: backend1
- path: "/backend2"
action:
pass: backend2
Loading
Loading