diff --git a/internal/configs/ingress.go b/internal/configs/ingress.go index 3a19025f85..18e3a88b26 100644 --- a/internal/configs/ingress.go +++ b/internal/configs/ingress.go @@ -534,34 +534,21 @@ func createUpstream(ingEx *IngressEx, name string, backend *networking.IngressBa endps = []string{} } - if cfg.UseClusterIP { - fqdn := fmt.Sprintf("%s.%s.svc.cluster.local:%d", backend.Service.Name, ingEx.Ingress.Namespace, backend.Service.Port.Number) + for _, endp := range endps { upsServers = append(upsServers, version1.UpstreamServer{ - Address: fqdn, + Address: endp, MaxFails: cfg.MaxFails, MaxConns: cfg.MaxConns, FailTimeout: cfg.FailTimeout, SlowStart: cfg.SlowStart, Resolve: isExternalNameSvc, }) + } + if len(upsServers) > 0 { + sort.Slice(upsServers, func(i, j int) bool { + return upsServers[i].Address < upsServers[j].Address + }) ups.UpstreamServers = upsServers - } else { - for _, endp := range endps { - upsServers = append(upsServers, version1.UpstreamServer{ - Address: endp, - MaxFails: cfg.MaxFails, - MaxConns: cfg.MaxConns, - FailTimeout: cfg.FailTimeout, - SlowStart: cfg.SlowStart, - Resolve: isExternalNameSvc, - }) - } - if len(upsServers) > 0 { - sort.Slice(upsServers, func(i, j int) bool { - return upsServers[i].Address < upsServers[j].Address - }) - ups.UpstreamServers = upsServers - } } } diff --git a/internal/configs/ingress_test.go b/internal/configs/ingress_test.go index af727c9378..07b6a3d881 100644 --- a/internal/configs/ingress_test.go +++ b/internal/configs/ingress_test.go @@ -700,7 +700,7 @@ func createExpectedConfigForMergeableCafeIngressWithUseClusterIP() version1.Ingr UpstreamZoneSize: upstreamZoneSize, UpstreamServers: []version1.UpstreamServer{ { - Address: "coffee-svc.default.svc.cluster.local:80", + Address: "10.0.0.1:80", MaxFails: 1, MaxConns: 0, FailTimeout: "10s", @@ -803,7 +803,7 @@ func createExpectedConfigForCafeIngressWithUseClusterIP() version1.IngressNginxC UpstreamZoneSize: upstreamZoneSize, UpstreamServers: []version1.UpstreamServer{ { - Address: "coffee-svc.default.svc.cluster.local:80", + Address: "10.0.0.1:80", MaxFails: 1, MaxConns: 0, FailTimeout: "10s", @@ -817,7 +817,7 @@ func createExpectedConfigForCafeIngressWithUseClusterIP() version1.IngressNginxC UpstreamZoneSize: upstreamZoneSize, UpstreamServers: []version1.UpstreamServer{ { - Address: "tea-svc.default.svc.cluster.local:80", + Address: "10.0.0.2:80", MaxFails: 1, MaxConns: 0, FailTimeout: "10s", diff --git a/internal/k8s/controller.go b/internal/k8s/controller.go index 6470fe518f..617ba49caf 100644 --- a/internal/k8s/controller.go +++ b/internal/k8s/controller.go @@ -842,35 +842,51 @@ func (lbc *LoadBalancerController) syncEndpointSlices(task task) bool { } endpointSlice := obj.(*discovery_v1.EndpointSlice) - svcResource := lbc.configuration.FindResourcesForService(endpointSlice.Namespace, endpointSlice.Labels["kubernetes.io/service-name"]) + svcName := endpointSlice.Labels["kubernetes.io/service-name"] + svcResource := lbc.configuration.FindResourcesForService(endpointSlice.Namespace, svcName) resourceExes := lbc.createExtendedResources(svcResource) if len(resourceExes.IngressExes) > 0 { - resourcesFound = true - glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.IngressExes) - err = lbc.configurator.UpdateEndpoints(resourceExes.IngressExes) - if err != nil { - glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.IngressExes, err) + for _, ingEx := range resourceExes.IngressExes { + if lbc.ingressRequiresEndpointsUpdate(ingEx, svcName) { + resourcesFound = true + glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.IngressExes) + err = lbc.configurator.UpdateEndpoints(resourceExes.IngressExes) + if err != nil { + glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.IngressExes, err) + } + break + } } } if len(resourceExes.MergeableIngresses) > 0 { - resourcesFound = true - glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.MergeableIngresses) - err = lbc.configurator.UpdateEndpointsMergeableIngress(resourceExes.MergeableIngresses) - if err != nil { - glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.MergeableIngresses, err) + for _, mergeableIngresses := range resourceExes.MergeableIngresses { + if lbc.mergeableIngressRequiresEndpointsUpdate(mergeableIngresses, svcName) { + resourcesFound = true + glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.MergeableIngresses) + err = lbc.configurator.UpdateEndpointsMergeableIngress(resourceExes.MergeableIngresses) + if err != nil { + glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.MergeableIngresses, err) + } + break + } } } if lbc.areCustomResourcesEnabled { if len(resourceExes.VirtualServerExes) > 0 { - resourcesFound = true - glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.VirtualServerExes) - err := lbc.configurator.UpdateEndpointsForVirtualServers(resourceExes.VirtualServerExes) - if err != nil { - glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.VirtualServerExes, err) + for _, vsEx := range resourceExes.VirtualServerExes { + if lbc.virtualServerRequiresEndpointsUpdate(vsEx, svcName) { + resourcesFound = true + glog.V(3).Infof("Updating EndpointSlices for %v", resourceExes.VirtualServerExes) + err := lbc.configurator.UpdateEndpointsForVirtualServers(resourceExes.VirtualServerExes) + if err != nil { + glog.Errorf("Error updating EndpointSlices for %v: %v", resourceExes.VirtualServerExes, err) + } + break + } } } @@ -886,6 +902,63 @@ func (lbc *LoadBalancerController) syncEndpointSlices(task task) bool { return resourcesFound } +func (lbc *LoadBalancerController) virtualServerRequiresEndpointsUpdate(vsEx *configs.VirtualServerEx, serviceName string) bool { + for _, upstream := range vsEx.VirtualServer.Spec.Upstreams { + if upstream.Service == serviceName && !upstream.UseClusterIP { + return true + } + } + + for _, vsr := range vsEx.VirtualServerRoutes { + for _, upstream := range vsr.Spec.Upstreams { + if upstream.Service == serviceName && !upstream.UseClusterIP { + return true + } + } + } + + return false +} + +func (lbc *LoadBalancerController) ingressRequiresEndpointsUpdate(ingressEx *configs.IngressEx, serviceName string) bool { + hasUseClusterIPAnnotation := ingressEx.Ingress.Annotations[useClusterIPAnnotation] == "true" + + for _, rule := range ingressEx.Ingress.Spec.Rules { + if http := rule.HTTP; http != nil { + for _, path := range http.Paths { + if path.Backend.Service != nil && path.Backend.Service.Name == serviceName { + if !hasUseClusterIPAnnotation { + return true + } + } + } + } + } + + if http := ingressEx.Ingress.Spec.DefaultBackend; http != nil { + if http.Service != nil && http.Service.Name == serviceName { + if !hasUseClusterIPAnnotation { + return true + } + } + } + + return false +} + +func (lbc *LoadBalancerController) mergeableIngressRequiresEndpointsUpdate(mergeableIngresses *configs.MergeableIngresses, serviceName string) bool { + masterIngress := mergeableIngresses.Master + minions := mergeableIngresses.Minions + + for _, minion := range minions { + if lbc.ingressRequiresEndpointsUpdate(minion, serviceName) { + return true + } + } + + return lbc.ingressRequiresEndpointsUpdate(masterIngress, serviceName) +} + func (lbc *LoadBalancerController) createExtendedResources(resources []Resource) configs.ExtendedResources { var result configs.ExtendedResources @@ -2793,6 +2866,7 @@ func (lbc *LoadBalancerController) createMergeableIngresses(ingConfig *IngressCo } func (lbc *LoadBalancerController) createIngressEx(ing *networking.Ingress, validHosts map[string]bool, validMinionPaths map[string]bool) *configs.IngressEx { + var endps []string ingEx := &configs.IngressEx{ Ingress: ing, ValidHosts: validHosts, @@ -2874,6 +2948,7 @@ func (lbc *LoadBalancerController) createIngressEx(ing *networking.Ingress, vali ingEx.HealthChecks = make(map[string]*api_v1.Probe) ingEx.ExternalNameSvcs = make(map[string]bool) ingEx.PodsByIP = make(map[string]configs.PodInfo) + hasUseClusterIP := ingEx.Ingress.Annotations[configs.UseClusterIPAnnotation] == "true" if ing.Spec.DefaultBackend != nil { podEndps := []podEndpoint{} @@ -2892,7 +2967,11 @@ func (lbc *LoadBalancerController) createIngressEx(ing *networking.Ingress, vali glog.Warningf("Error retrieving endpoints for the service %v: %v", ing.Spec.DefaultBackend.Service.Name, err) } - endps := getIPAddressesFromEndpoints(podEndps) + if svc != nil && !external && hasUseClusterIP { + endps = []string{ipv6SafeAddrPort(svc.Spec.ClusterIP, ing.Spec.DefaultBackend.Service.Port.Number)} + } else { + endps = getIPAddressesFromEndpoints(podEndps) + } // endps is empty if there was any error before this point ingEx.Endpoints[ing.Spec.DefaultBackend.Service.Name+configs.GetBackendPortAsString(ing.Spec.DefaultBackend.Service.Port)] = endps @@ -2948,7 +3027,11 @@ func (lbc *LoadBalancerController) createIngressEx(ing *networking.Ingress, vali glog.Warningf("Error retrieving endpoints for the service %v: %v", path.Backend.Service.Name, err) } - endps := getIPAddressesFromEndpoints(podEndps) + if svc != nil && !external && hasUseClusterIP { + endps = []string{ipv6SafeAddrPort(svc.Spec.ClusterIP, path.Backend.Service.Port.Number)} + } else { + endps = getIPAddressesFromEndpoints(podEndps) + } // endps is empty if there was any error before this point ingEx.Endpoints[path.Backend.Service.Name+configs.GetBackendPortAsString(path.Backend.Service.Port)] = endps diff --git a/tests/data/use-cluster-ip/ingress/mergeable/minion-ingress.yaml b/tests/data/use-cluster-ip/ingress/mergeable/minion-ingress.yaml new file mode 100644 index 0000000000..2b1ed90bb3 --- /dev/null +++ b/tests/data/use-cluster-ip/ingress/mergeable/minion-ingress.yaml @@ -0,0 +1,20 @@ +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: use-cluster-ip-ingress-minion + annotations: + nginx.org/use-cluster-ip: "true" + nginx.org/mergeable-ingress-type: "minion" +spec: + ingressClassName: nginx + rules: + - host: use-cluster-ip.example.com + http: + paths: + - path: /backend1 + pathType: Prefix + backend: + service: + name: backend1-svc + port: + number: 80 diff --git a/tests/data/use-cluster-ip/ingress/mergeable/use-cluster-ip-ingress.yaml b/tests/data/use-cluster-ip/ingress/mergeable/use-cluster-ip-ingress.yaml new file mode 100644 index 0000000000..7089af4e15 --- /dev/null +++ b/tests/data/use-cluster-ip/ingress/mergeable/use-cluster-ip-ingress.yaml @@ -0,0 +1,10 @@ +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + annotations: + nginx.org/mergeable-ingress-type: "master" + name: use-cluster-ip-ingress-master +spec: + ingressClassName: nginx + rules: + - host: use-cluster-ip.example.com diff --git a/tests/data/use-cluster-ip/ingress/standard/use-cluster-ip-ingress.yaml b/tests/data/use-cluster-ip/ingress/standard/use-cluster-ip-ingress.yaml new file mode 100644 index 0000000000..ffbe29c406 --- /dev/null +++ b/tests/data/use-cluster-ip/ingress/standard/use-cluster-ip-ingress.yaml @@ -0,0 +1,19 @@ +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + annotations: + nginx.org/use-cluster-ip: "true" + name: use-cluster-ip-ingress +spec: + ingressClassName: nginx + rules: + - host: use-cluster-ip.example.com + http: + paths: + - path: /backend1 + pathType: Prefix + backend: + service: + name: backend1-svc + port: + number: 80 diff --git a/tests/data/virtual-server-use-cluster-ip/standard/virtual-server.yaml b/tests/data/virtual-server-use-cluster-ip/standard/virtual-server.yaml new file mode 100644 index 0000000000..7ec4337972 --- /dev/null +++ b/tests/data/virtual-server-use-cluster-ip/standard/virtual-server.yaml @@ -0,0 +1,21 @@ +apiVersion: k8s.nginx.org/v1 +kind: VirtualServer +metadata: + name: virtual-server +spec: + host: virtual-server.example.com + upstreams: + - name: backend1 + service: backend1-svc + port: 80 + use-cluster-ip: true + - name: backend2 + service: backend2-svc + port: 80 + routes: + - path: "/backend1" + action: + pass: backend1 + - path: "/backend2" + action: + pass: backend2 diff --git a/tests/suite/test_use_cluster_ip.py b/tests/suite/test_use_cluster_ip.py new file mode 100644 index 0000000000..fcadc1d4b9 --- /dev/null +++ b/tests/suite/test_use_cluster_ip.py @@ -0,0 +1,104 @@ +import pytest +from settings import TEST_DATA +from suite.utils.resources_utils import ( + create_example_app, + create_ingress_from_yaml, + create_secret_from_yaml, + delete_common_app, + delete_items_from_yaml, + delete_secret, + ensure_connection_to_public_endpoint, + get_reload_count, + replace_secret, + scale_deployment, + wait_before_test, +) +from suite.utils.yaml_utils import get_first_ingress_host_from_yaml, get_name_from_yaml + +from tests.suite.utils.custom_assertions import assert_pods_scaled_to_count + + +class UseClusterIPSetup: + def __init__(self, ingress_host, metrics_url): + self.ingress_host = ingress_host + self.metrics_url = metrics_url + + +@pytest.fixture(scope="class") +def use_cluster_ip_setup( + request, + kube_apis, + ingress_controller_prerequisites, + ingress_controller_endpoint, + ingress_controller, + test_namespace, +) -> UseClusterIPSetup: + print("------------------------- Deploy use-cluster-ip setup -----------------------------------") + + test_data_path = f"{TEST_DATA}/use-cluster-ip/ingress" + metrics_url = f"http://{ingress_controller_endpoint.public_ip}:{ingress_controller_endpoint.metrics_port}/metrics" + + ingress_path = f"{test_data_path}/{request.param}/use-cluster-ip-ingress.yaml" + create_ingress_from_yaml(kube_apis.networking_v1, test_namespace, ingress_path) + if request.param == "mergeable": + create_ingress_from_yaml( + kube_apis.networking_v1, test_namespace, f"{test_data_path}/{request.param}/minion-ingress.yaml" + ) + create_example_app(kube_apis, "simple", test_namespace) + + wait_before_test(1) + + ingress_host = get_first_ingress_host_from_yaml(ingress_path) + + ensure_connection_to_public_endpoint( + ingress_controller_endpoint.public_ip, ingress_controller_endpoint.port, ingress_controller_endpoint.port_ssl + ) + + def fin(): + if request.config.getoption("--skip-fixture-teardown") == "no": + print("Clean up use-cluster-ip setup") + delete_items_from_yaml(kube_apis, ingress_path, test_namespace) + delete_common_app(kube_apis, "simple", test_namespace) + if request.param == "mergeable": + delete_items_from_yaml( + kube_apis, + f"{test_data_path}/{request.param}/minion-ingress.yaml", + test_namespace, + ) + + request.addfinalizer(fin) + + return UseClusterIPSetup( + ingress_host, + metrics_url, + ) + + +@pytest.mark.ingresses +@pytest.mark.parametrize( + "ingress_controller, use_cluster_ip_setup", + [ + pytest.param({"extra_args": ["-enable-prometheus-metrics"]}, "standard"), + pytest.param({"extra_args": ["-enable-prometheus-metrics"]}, "mergeable"), + ], + indirect=True, +) +class TestIngressUseClusterIPReloads: + def test_ingress_use_cluster_ip_reloads( + self, kube_apis, ingress_controller_endpoint, test_namespace, use_cluster_ip_setup + ): + print("Step 1: get initial reload count") + initial_reload_count = get_reload_count(use_cluster_ip_setup.metrics_url) + + print("Step 2: scale the deployment down") + scale_deployment(kube_apis.v1, kube_apis.apps_v1_api, "backend1", test_namespace, 1) + assert_pods_scaled_to_count(kube_apis.apps_v1_api, kube_apis.v1, "backend1", test_namespace, 1) + + print("Step 3: scale the deployment up") + scale_deployment(kube_apis.v1, kube_apis.apps_v1_api, "backend1", test_namespace, 4) + assert_pods_scaled_to_count(kube_apis.apps_v1_api, kube_apis.v1, "backend1", test_namespace, 4) + + print("Step 4: get reload count after scaling") + reload_count_after_scaling = get_reload_count(use_cluster_ip_setup.metrics_url) + + assert reload_count_after_scaling == initial_reload_count, "Expected: no new reloads" diff --git a/tests/suite/test_virtual_server_use_cluster_ip_reloads.py b/tests/suite/test_virtual_server_use_cluster_ip_reloads.py new file mode 100644 index 0000000000..9937f9c2a2 --- /dev/null +++ b/tests/suite/test_virtual_server_use_cluster_ip_reloads.py @@ -0,0 +1,48 @@ +import pytest +from suite.utils.resources_utils import ( + get_reload_count, + scale_deployment, + wait_before_test, + wait_until_all_pods_are_ready, +) + +from tests.suite.utils.custom_assertions import assert_pods_scaled_to_count + + +@pytest.mark.vs +@pytest.mark.parametrize( + "crd_ingress_controller, virtual_server_setup", + [ + ( + { + "type": "complete", + "extra_args": [ + "-enable-custom-resources", + "-enable-prometheus-metrics", + ], + }, + {"example": "virtual-server-use-cluster-ip", "app_type": "simple"}, + ) + ], + indirect=True, +) +class TestVSUseClusterIP: + def test_use_cluster_ip_reloads( + self, kube_apis, ingress_controller_endpoint, crd_ingress_controller, virtual_server_setup + ): + wait_until_all_pods_are_ready(kube_apis.v1, virtual_server_setup.namespace) + print("Step 1: get initial reload count") + initial_reload_count = get_reload_count(virtual_server_setup.metrics_url) + + print("Step 2: scale the deployment down") + scale_deployment(kube_apis.v1, kube_apis.apps_v1_api, "backend1", virtual_server_setup.namespace, 1) + assert_pods_scaled_to_count(kube_apis.apps_v1_api, kube_apis.v1, "backend1", virtual_server_setup.namespace, 1) + + print("Step 3: scale the deployment up") + scale_deployment(kube_apis.v1, kube_apis.apps_v1_api, "backend1", virtual_server_setup.namespace, 4) + assert_pods_scaled_to_count(kube_apis.apps_v1_api, kube_apis.v1, "backend1", virtual_server_setup.namespace, 4) + + print("Step 4: get reload count after scaling") + reload_count_after_scaling = get_reload_count(virtual_server_setup.metrics_url) + + assert reload_count_after_scaling == initial_reload_count, "Expected: no new reloads" diff --git a/tests/suite/utils/custom_assertions.py b/tests/suite/utils/custom_assertions.py index 39474eb522..6cfc926886 100644 --- a/tests/suite/utils/custom_assertions.py +++ b/tests/suite/utils/custom_assertions.py @@ -268,3 +268,36 @@ def assert_proxy_entries_exist(config) -> None: assert "proxy_next_upstream error timeout;" in config assert "proxy_next_upstream_timeout 0s;" in config assert "proxy_next_upstream_tries 0;" in config + + +def assert_pods_scaled_to_count(apps_v1_api, v1, deployment_name, namespace, expected_count, timeout=60, interval=1): + """ + Check if the number of pods for a given deployment has scaled down to the expected count. + + :param apps_v1_api: AppsV1Api + :param v1: CoreV1Api + :param deployment_name: name of the deployment to check. + :param namespace: namespace of the deployment. + :param expected_count: expected number of pods after scaling. + :param timeout: Maximum time to wait for the expected count to be met. + :param interval: Time to wait between checks. + """ + end_time = time.time() + timeout + while time.time() < end_time: + selector = ",".join( + [ + f"{key}={value}" + for key, value in apps_v1_api.read_namespaced_deployment( + deployment_name, namespace + ).spec.selector.match_labels.items() + ] + ) + pods = v1.list_namespaced_pod(namespace, label_selector=selector) + pod_count = len(pods.items) + if pod_count == expected_count: + print(f"Expected {expected_count} pods, found {pod_count} for '{deployment_name}' in '{namespace}'.") + return + time.sleep(interval) + assert ( + False + ), f"Expected {expected_count} pods, but found {pod_count} for '{deployment_name}' in '{namespace}' after {timeout} seconds."