From 0f859af1a30793a04b5465333dc6e42225ca59d9 Mon Sep 17 00:00:00 2001 From: "Jung-Yu (Gina) Yeh" Date: Mon, 28 Aug 2023 23:02:16 +0000 Subject: [PATCH] GAMMA and Stateful Session Affinity tests --- tools/run_tests/run_xds_tests.py | 3 +- tools/run_tests/xds_k8s_test_driver/README.md | 4 +- .../framework/helpers/grpc.py | 20 +- .../framework/infrastructure/gcp/compute.py | 3 + .../framework/infrastructure/k8s.py | 139 +++++++++++++- .../infrastructure/traffic_director.py | 31 ++++ .../framework/rpc/grpc_testing.py | 5 +- .../framework/test_app/client_app.py | 5 +- .../runners/k8s/gamma_server_runner.py | 53 +++++- .../test_app/runners/k8s/k8s_base_runner.py | 171 +++++++++++++++--- .../framework/xds_gamma_testcase.py | 5 +- .../framework/xds_k8s_testcase.py | 21 ++- .../gamma/backend_policy.yaml | 17 ++ .../gamma/route_http.yaml | 25 +++ .../gamma/route_http_3.yaml | 30 +++ .../gamma/route_http_gamma.yaml | 21 +++ .../gamma/route_http_ssafilter.yaml | 27 +++ .../gamma/session_affinity_filter.yaml | 10 + .../gamma/session_affinity_policy_route.yaml | 15 ++ .../session_affinity_policy_service.yaml | 14 ++ .../kubernetes-manifests/gamma/tdmesh.yaml | 8 - .../server.deployment.yaml | 1 + .../tests/baseline_test.py | 6 +- .../tests/gamma/affinity_failover_test.py | 93 ++++++++++ .../tests/gamma/affinity_filter_test.py | 75 ++++++++ .../affinity_policy_target_route_test.py | 75 ++++++++ .../affinity_policy_target_service_test.py | 76 ++++++++ 27 files changed, 901 insertions(+), 52 deletions(-) create mode 100644 tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/backend_policy.yaml create mode 100644 tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http.yaml create mode 100644 tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http_3.yaml create mode 100644 tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http_gamma.yaml create mode 100644 tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http_ssafilter.yaml create mode 100644 tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_filter.yaml create mode 100644 tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_policy_route.yaml create mode 100644 tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_policy_service.yaml create mode 100644 tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_failover_test.py create mode 100644 tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_filter_test.py create mode 100644 tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_policy_target_route_test.py create mode 100644 tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_policy_target_service_test.py diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 9c33ba10b3e56..533ea862a2d8a 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -427,7 +427,7 @@ def parse_port_range(port_arg): _SPONGE_XML_NAME = "sponge_log.xml" -def get_client_stats(num_rpcs, timeout_sec): +def get_client_stats(num_rpcs, timeout_sec, metadata): if CLIENT_HOSTS: hosts = CLIENT_HOSTS else: @@ -440,6 +440,7 @@ def get_client_stats(num_rpcs, timeout_sec): request = messages_pb2.LoadBalancerStatsRequest() request.num_rpcs = num_rpcs request.timeout_sec = timeout_sec + request.metadata = metadata rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC logger.debug( "Invoking GetClientStats RPC to %s:%d:", host, args.stats_port diff --git a/tools/run_tests/xds_k8s_test_driver/README.md b/tools/run_tests/xds_k8s_test_driver/README.md index ed1aeb2a42dd7..d7bf247ea39d1 100644 --- a/tools/run_tests/xds_k8s_test_driver/README.md +++ b/tools/run_tests/xds_k8s_test_driver/README.md @@ -38,7 +38,7 @@ sudo apt-get install python3-venv ##### Getting Started 1. If you haven't, [initialize](https://cloud.google.com/sdk/docs/install-sdk) gcloud SDK -2. Activate gcloud [configuration](https://cloud.google.com/sdk/docs/configurations) with your project +2. Activate gcloud [configuration](https://cloud.google.com/sdk/docs/configurations) with your project 3. Enable gcloud services: ```shell gcloud services enable \ @@ -54,7 +54,6 @@ sudo apt-get install python3-venv #### Configure GKE cluster This is an example outlining minimal requirements to run the [baseline tests](xds-baseline-tests). - Update gloud sdk: ```shell gcloud -q components update @@ -91,7 +90,6 @@ gcloud container clusters create "${CLUSTER_NAME}" \ --workload-metadata=GKE_METADATA \ --tags=allow-health-checks ``` - For security tests you also need to create CAs and configure the cluster to use those CAs as described [here](https://cloud.google.com/traffic-director/docs/security-proxyless-setup#configure-cas). diff --git a/tools/run_tests/xds_k8s_test_driver/framework/helpers/grpc.py b/tools/run_tests/xds_k8s_test_driver/framework/helpers/grpc.py index efa0d51c4a7f9..46d779c9b866e 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/helpers/grpc.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/helpers/grpc.py @@ -23,7 +23,8 @@ # Type aliases RpcsByPeer: Dict[str, int] - +RpcMetadata = grpc_testing.LoadBalancerStatsResponse.RpcMetadata +MetadataByPeer: list[str, RpcMetadata] @functools.cache # pylint: disable=no-member def status_from_int(grpc_status_int: int) -> Optional[grpc.StatusCode]: @@ -131,6 +132,8 @@ class PrettyLoadBalancerStats: # } rpcs_by_method: Dict[str, "RpcsByPeer"] + metadatas_by_peer: Dict[str, "MetadataByPeer"] + @staticmethod def _parse_rpcs_by_peer( rpcs_by_peer: grpc_testing.RpcsByPeer, @@ -140,6 +143,19 @@ def _parse_rpcs_by_peer( result[peer] = count return result + @staticmethod + def _parse_metadatas_by_peer( + metadatas_by_peer: grpc_testing.LoadBalancerStatsResponse.MetadataByPeer, + ) -> "MetadataByPeer": + result = dict() + for peer, metadatas in metadatas_by_peer.items(): + pretty_metadata = "" + for metadatas in metadatas.rpc_metadata: + for metadata in metadatas.metadata: + pretty_metadata += metadata.key + ": " + metadata.value + ", " + result[peer] = pretty_metadata + return result + @classmethod def from_response( cls, lb_stats: grpc_testing.LoadBalancerStatsResponse @@ -154,9 +170,9 @@ def from_response( num_failures=lb_stats.num_failures, rpcs_by_peer=cls._parse_rpcs_by_peer(lb_stats.rpcs_by_peer), rpcs_by_method=rpcs_by_method, + metadatas_by_peer=cls._parse_metadatas_by_peer(lb_stats.metadatas_by_peer), ) - def lb_stats_pretty(lb: grpc_testing.LoadBalancerStatsResponse) -> str: """Pretty print LoadBalancerStatsResponse. diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py index 2fa738d2de53d..c31eb279c926e 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py @@ -60,6 +60,9 @@ class BackendServiceProtocol(enum.Enum): HTTP2 = enum.auto() GRPC = enum.auto() + class SessionAffinityProtocol(enum.Enum): + HTTP = enum.auto() + def create_health_check( self, name: str, diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py index 39fb36eb034f4..0509c707c7162 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py @@ -54,6 +54,10 @@ DynResourceInstance = dynamic_res.ResourceInstance GammaMesh = DynResourceInstance GammaGrpcRoute = DynResourceInstance +GammaHttpRoute = DynResourceInstance +GcpSessionAffinityPolicy = DynResourceInstance +GcpSessionAffinityFilter = DynResourceInstance +GcpBackendPolicy = DynResourceInstance _timedelta = datetime.timedelta _ApiException = client.ApiException @@ -161,6 +165,18 @@ def grpc_route(self, version: str) -> dynamic_res.Resource: return self._load_dynamic_api(api_name, version, kind) + @functools.cache # pylint: disable=no-member + def http_route(self, version: str) -> dynamic_res.Resource: + api_name = "gateway.networking.k8s.io" + kind = "HTTPRoute" + supported_versions = {"v1beta1", "v1alpha2"} + if version not in supported_versions: + raise NotImplementedError( + f"{kind} {api_name}/{version} not implemented." + ) + + return self._load_dynamic_api(api_name, version, kind) + def close(self): # TODO(sergiitk): [GAMMA] what to do with dynamic clients? self.client.close() @@ -243,6 +259,34 @@ def api_grpc_route(self) -> dynamic_res.Resource: "GRPCRoute", ) + @functools.cached_property # pylint: disable=no-member + def api_http_route(self) -> dynamic_res.Resource: + return self._get_dynamic_api( + "gateway.networking.k8s.io/v1alpha2", + "HTTPRoute", + ) + + @functools.cached_property # pylint: disable=no-member + def api_session_affinity_policy(self) -> dynamic_res.Resource: + return self._get_dynamic_api( + "networking.gke.io/v1", + "GCPSessionAffinityPolicy", + ) + + @functools.cached_property # pylint: disable=no-member + def api_session_affinity_filter(self) -> dynamic_res.Resource: + return self._get_dynamic_api( + "networking.gke.io/v1", + "GCPSessionAffinityFilter", + ) + + @functools.cached_property # pylint: disable=no-member + def api_backend_policy(self) -> dynamic_res.Resource: + return self._get_dynamic_api( + "networking.gke.io/v1", + "GCPBackendPolicy", + ) + def _refresh_auth(self): logger.info("Reloading k8s api client to refresh the auth.") self._api.reload() @@ -276,6 +320,8 @@ def _get_dynamic_api(self, api_version, kind) -> dynamic_res.Resource: elif group == "gateway.networking.k8s.io": if kind == "GRPCRoute": return self._api.grpc_route(version) + elif kind == "HTTPRoute": + return self._api.http_route(version) raise NotImplementedError(f"{kind} {api_version} not implemented.") @@ -445,8 +491,17 @@ def get_service(self, name) -> V1Service: def get_gamma_mesh(self, name) -> Optional[GammaMesh]: return self._get_dyn_resource(self.api_gke_mesh, name) - def get_gamma_route(self, name) -> Optional[GammaGrpcRoute]: - return self._get_dyn_resource(self.api_grpc_route, name) + def get_gamma_route(self, name) -> Optional[GammaHttpRoute]: + return self._get_dyn_resource(self.api_http_route, name) + + def get_session_affinity_policy(self, name) -> Optional[GcpSessionAffinityPolicy]: + return self._get_dyn_resource(self.api_session_affinity_policy, name) + + def get_session_affinity_filter(self, name) -> Optional[GcpSessionAffinityFilter]: + return self._get_dyn_resource(self.api_session_affinity_filter, name) + + def get_backend_policy(self, name) -> Optional[GcpBackendPolicy]: + return self._get_dyn_resource(self.api_backend_policy, name) def get_service_account(self, name) -> V1Service: return self._get_resource( @@ -502,7 +557,46 @@ def delete_gamma_route( # TODO(sergiitk): [GAMMA] Can we call delete on dynamic_res.ResourceList # to avoid no-member issues due to dynamic_res.Resource proxying calls? self._execute( - self.api_grpc_route.delete, # pylint: disable=no-member + self.api_http_route.delete, # pylint: disable=no-member + name=name, + namespace=self.name, + propagation_policy="Foreground", + grace_period_seconds=grace_period_seconds, + ) + + def delete_session_affinity_policy( + self, + name: str, + grace_period_seconds=DELETE_GRACE_PERIOD_SEC, + ) -> None: + self._execute( + self.api_session_affinity_policy.delete, + name=name, + namespace=self.name, + propagation_policy="Foreground", + grace_period_seconds=grace_period_seconds, + ) + + def delete_session_affinity_filter( + self, + name: str, + grace_period_seconds=DELETE_GRACE_PERIOD_SEC, + ) -> None: + self._execute( + self.api_session_affinity_filter.delete, + name=name, + namespace=self.name, + propagation_policy="Foreground", + grace_period_seconds=grace_period_seconds, + ) + + def delete_backend_policy( + self, + name: str, + grace_period_seconds=DELETE_GRACE_PERIOD_SEC, + ) -> None: + self._execute( + self.api_backend_policy.delete, name=name, namespace=self.name, propagation_policy="Foreground", @@ -561,6 +655,45 @@ def wait_for_get_gamma_route_deleted( ) retryer(self.get_gamma_route, name) + def wait_for_get_session_affinity_policy_deleted( + self, + name: str, + timeout_sec: int = WAIT_MEDIUM_TIMEOUT_SEC, + wait_sec: int = WAIT_MEDIUM_SLEEP_SEC, + ) -> None: + retryer = retryers.constant_retryer( + wait_fixed=_timedelta(seconds=wait_sec), + timeout=_timedelta(seconds=timeout_sec), + check_result=lambda policy: policy is None, + ) + retryer(self.get_session_affinity_policy, name) + + def wait_for_get_session_affinity_filter_deleted( + self, + name: str, + timeout_sec: int = WAIT_MEDIUM_TIMEOUT_SEC, + wait_sec: int = WAIT_MEDIUM_SLEEP_SEC, + ) -> None: + retryer = retryers.constant_retryer( + wait_fixed=_timedelta(seconds=wait_sec), + timeout=_timedelta(seconds=timeout_sec), + check_result=lambda policy: policy is None, + ) + retryer(self.get_session_affinity_filter, name) + + def wait_for_get_backend_policy_deleted( + self, + name: str, + timeout_sec: int = WAIT_MEDIUM_TIMEOUT_SEC, + wait_sec: int = WAIT_MEDIUM_SLEEP_SEC, + ) -> None: + retryer = retryers.constant_retryer( + wait_fixed=_timedelta(seconds=wait_sec), + timeout=_timedelta(seconds=timeout_sec), + check_result=lambda policy: policy is None, + ) + retryer(self.get_backend_policy, name) + def wait_for_service_account_deleted( self, name: str, diff --git a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py index c3c5d6815c5c5..202a496302d0e 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py @@ -26,10 +26,12 @@ _ComputeV1 = gcp.compute.ComputeV1 GcpResource = _ComputeV1.GcpResource HealthCheckProtocol = _ComputeV1.HealthCheckProtocol +SessionAffinityProtocol = _ComputeV1.SessionAffinityProtocol ZonalGcpResource = _ComputeV1.ZonalGcpResource BackendServiceProtocol = _ComputeV1.BackendServiceProtocol _BackendGRPC = BackendServiceProtocol.GRPC _HealthCheckGRPC = HealthCheckProtocol.GRPC +_SessionAffinityHTTP = SessionAffinityProtocol.HTTP # Network Security _NetworkSecurityV1Beta1 = gcp.network_security.NetworkSecurityV1Beta1 @@ -65,6 +67,7 @@ class TrafficDirectorManager: # pylint: disable=too-many-public-methods FORWARDING_RULE_NAME = "forwarding-rule" ALTERNATIVE_FORWARDING_RULE_NAME = "forwarding-rule-alt" FIREWALL_RULE_NAME = "allow-health-checks" + SESSION_AFFINITY_NAME = "session-affinity-policy" def __init__( self, @@ -199,6 +202,34 @@ def delete_health_check(self, force=False): self.compute.delete_health_check(name) self.health_check = None + def create_session_affinity_policy( + self, + protocol: Optional[SessionAffinityProtocol] = _SessionAffinityHTTP, + ): + if self.session_affinity_policy: + raise ValueError( + f"Session affinity policy {self.session_affinity_policy.name} " + "already created, delete it first" + ) + if protocol is None: + protocol = _SessionAffinityHTTP + + name = self.make_resource_name(self.SESSION_AFFINITY_NAME) + logger.info('Creating %s Session Affinity Policy "%s"', protocol.name, name) + resource = self.compute.create_session_affinity_policy(name, protocol) + self.session_affinity_policy = resource + + def delete_session_affinity_policy(self, force=False): + if force: + name = self.make_resource_name(self.SESSION_AFFINITY_NAME) + elif self.session_affinity_policy: + name = self.session_affinity_policy.name + else: + return + logger.info('Deleting Session Affinity Policy "%s"', name) + self.compute.delete_session_affinity_policy(name) + self.session_affinity_policy = None + def create_backend_service( self, protocol: Optional[BackendServiceProtocol] = _BackendGRPC, diff --git a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py index be50a79be892f..b15af8d2f64da 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py @@ -59,6 +59,7 @@ def get_client_stats( *, num_rpcs: int, timeout_sec: Optional[int] = STATS_PARTIAL_RESULTS_TIMEOUT_SEC, + metadata_keys: list[str] = None, ) -> LoadBalancerStatsResponse: if timeout_sec is None: timeout_sec = self.STATS_PARTIAL_RESULTS_TIMEOUT_SEC @@ -66,7 +67,9 @@ def get_client_stats( return self.call_unary_with_deadline( rpc="GetClientStats", req=_LoadBalancerStatsRequest( - num_rpcs=num_rpcs, timeout_sec=timeout_sec + num_rpcs=num_rpcs, + timeout_sec=timeout_sec, + metadata_keys=metadata_keys, ), deadline_sec=timeout_sec, log_level=logging.INFO, diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py index a232e41cd749c..16b2e636cd41a 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py @@ -109,8 +109,11 @@ def get_load_balancer_stats( """ Shortcut to LoadBalancerStatsServiceClient.get_client_stats() """ + logger.info("[gina] client_app.py get_load_balancer_stats") return self.load_balancer_stats.get_client_stats( - num_rpcs=num_rpcs, timeout_sec=timeout_sec + num_rpcs=num_rpcs, + timeout_sec=timeout_sec, + metadata_keys="*", ) def get_load_balancer_accumulated_stats( diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/gamma_server_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/gamma_server_runner.py index e6c86775f7cd1..18375281b85f7 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/gamma_server_runner.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/gamma_server_runner.py @@ -31,7 +31,7 @@ class GammaServerRunner(KubernetesServerRunner): # Mutable state. mesh: Optional[k8s.GammaMesh] = None - route: Optional[k8s.GammaGrpcRoute] = None + route: Optional[k8s.GammaHttpRoute] = None # Mesh server_xds_host: str @@ -152,7 +152,7 @@ def run( # Create the route. self.route = self._create_gamma_route( - "gamma/route_grpc.yaml", + "gamma/route_http.yaml", xds_server_uri=self.server_xds_host, route_name=self.route_name, mesh_name=self.mesh_name, @@ -206,6 +206,43 @@ def run( secure_mode=secure_mode, ) + def createSessionAffinityPolicy(self, *, target_type): + if cmp(target_type, "route"): + self.sapolicy_name = "ssa-policy-route" + self.saPolicy = self._create_session_affinity_policy( + "gamma/session_affinity_policy_route.yaml", + session_affinity_policy_name=self.sapolicy_name, + namespace_name=self.k8s_namespace.name, + route_name=self.route_name, + ) + elif cmp(target_type, "service"): + self.sapolicy_name = "ssa-policy-service" + self.saPolicy = self._create_session_affinity_policy( + "gamma/session_affinity_policy_service.yaml", + session_affinity_policy_name=self.sapolicy_name, + namespace_name=self.k8s_namespace.name, + service_name=self.service_name, + ) + else: + return + + def createSessionAffinityFilter(self): + self.safilter_name = "ssa-filter" + self.saFilter = self._create_session_affinity_filter( + "gamma/session_affinity_filter.yaml", + session_affinity_filter_name=self.safilter_name, + namespace_name=self.k8s_namespace.name, + ) + + def createBackendPolicy(self, *, target_type): + self.bepolicy_name = "be-policy" + self.bePolicy = self._create_backend_policy( + "gamma/backend_policy.yaml", + be_policy_name=self.bepolicy_name, + namespace_name=self.k8s_namespace.name, + service_name=self.service_name, + ) + # pylint: disable=arguments-differ def cleanup(self, *, force=False, force_namespace=False): try: @@ -224,6 +261,18 @@ def cleanup(self, *, force=False, force_namespace=False): self._delete_deployment(self.deployment_name) self.deployment = None + if self.saPolicy or force: + self._delete_session_affinity_policy(self.sapolicy_name) + self.saPolicy = None + + if self.saFitler or force: + self._delete_session_affinity_filter(self.safilter_name) + self.saFilter = None + + if self.bePolicy or force: + self._delete_backend_policy(self.bepolicy_name) + self.bePolicy = None + if self.enable_workload_identity and ( self.service_account or force ): diff --git a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py index 8049076de04fe..23b2c822f428b 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/test_app/runners/k8s/k8s_base_runner.py @@ -464,31 +464,126 @@ def _create_gamma_mesh(self, template, **kwargs) -> k8s.GammaMesh: ) return mesh - def _create_gamma_route(self, template, **kwargs) -> k8s.GammaGrpcRoute: + def _create_gamma_route(self, template, **kwargs) -> k8s.GammaHttpRoute: route = self._create_from_template( template, custom_object=True, **kwargs, ) if not ( - isinstance(route, k8s.GammaGrpcRoute) and route.kind == "GRPCRoute" + isinstance(route, k8s.GammaHttpRoute) and route.kind == "HTTPRoute" ): raise _RunnerError( - f"Expected ResourceInstance[GRPCRoute] to be created from" + f"Expected ResourceInstance[HTTPRoute] to be created from" f" manifest {template}" ) if route.metadata.name != kwargs["route_name"]: raise _RunnerError( - "ResourceInstance[GRPCRoute] created with unexpected name: " + "ResourceInstance[HTTPRoute] created with unexpected name: " f"{route.metadata.name}" ) logger.debug( - "ResourceInstance[GRPCRoute] %s created at %s", + "ResourceInstance[HTTPRoute] %s created at %s", route.metadata.name, route.metadata.creation_timestamp, ) return route + def _create_session_affinity_policy(self, template, **kwargs) -> k8s.GcpSessionAffinityPolicy: + saPolicy = self._create_from_template( + template, + custom_object=True, + **kwargs, + ) + if not ( + isinstance(saPolicy, k8s.GcpSessionAffinityPolicy) and saPolicy.kind == "GCPSessionAffinityPolicy" + ): + raise _RunnerError( + f"Expected ResourceInstance[GCPSessionAffinityPolicy] to be" + f" created from manifest {template}" + ) + if saPolicy.metadata.name != kwargs["session_affinity_policy_name"]: + raise _RunnerError( + "ResourceInstance[GCPSessionAffinityPolicy] created with" + f" unexpected name: {saPolicy.metadata.name}" + ) + logger.debug( + "ResourceInstance[GCPSessionAffinityPolicy] %s created at %s", + saPolicy.metadata.name, + saPolicy.metadata.creation_timestamp, + ) + return saPolicy + + def _create_session_affinity_filter(self, template, **kwargs) -> k8s.GcpSessionAffinityFilter: + saFilter = self._create_from_template( + template, + custom_object=True, + **kwargs, + ) + if not ( + isinstance(saFilter, k8s.GcpSessionAffinityFilter) and saFilter.kind == "GCPSessionAffinityFilter" + ): + raise _RunnerError( + f"Expected ResourceInstance[GCPSessionAffinityFilter] to be" + f" created from manifest {template}" + ) + if saFilter.metadata.name != kwargs["session_affinity_filter_name"]: + raise _RunnerError( + "ResourceInstance[GCPSessionAffinityFilter] created with" + f" unexpected name: {saFilter.metadata.name}" + ) + logger.debug( + "ResourceInstance[GCPSessionAffinityFilter] %s created at %s", + saFilter.metadata.name, + saFilter.metadata.creation_timestamp, + ) + return saFilter + + def _create_backend_policy(self, template, **kwargs) -> k8s.GcpBackendPolicy: + saPolicy = self._create_from_template( + template, + custom_object=True, + **kwargs, + ) + if not ( + isinstance(bePolicy, k8s.GcpBackendPolicy) and bePolicy.kind == "GCPBackendPolicy" + ): + raise _RunnerError( + f"Expected ResourceInstance[GCPBackendPolicy] to be" + f" created from manifest {template}" + ) + if bePolicy.metadata.name != kwargs["be_policy_name"]: + raise _RunnerError( + "ResourceInstance[GCPBackendPolicy] created with" + f" unexpected name: {bePolicy.metadata.name}" + ) + logger.debug( + "ResourceInstance[GCPBackendPolicy] %s created at %s", + bePolicy.metadata.name, + bePolicy.metadata.creation_timestamp, + ) + return bePolicy + + def _create_service(self, template, **kwargs) -> k8s.V1Service: + service = self._create_from_template(template, **kwargs) + if not isinstance(service, k8s.V1Service): + raise _RunnerError( + f"Expected V1Service to be created from manifest {template}" + ) + if service.metadata.name != kwargs["service_name"]: + raise _RunnerError( + "V1Service created with unexpected name: " + f"{service.metadata.name}" + ) + logger.debug( + "V1Service %s created at %s", + service.metadata.self_link, + service.metadata.creation_timestamp, + ) + return service + + + def _delete_gamma_mesh(self, name, wait_for_deletion=True): logger.info("Deleting GAMMA mesh %s", name) try: @@ -502,34 +597,52 @@ def _delete_gamma_mesh(self, name, wait_for_deletion=True): logger.debug("GAMMA mesh %s deleted", name) def _delete_gamma_route(self, name, wait_for_deletion=True): - logger.info("Deleting GRPCRoute %s", name) + logger.info("Deleting HTTPRoute %s", name) try: self.k8s_namespace.delete_gamma_route(name) except (retryers.RetryError, k8s.NotFound) as e: - logger.info("GRPCRoute %s deletion failed: %s", name, e) + logger.info("HTTPRoute %s deletion failed: %s", name, e) return if wait_for_deletion: self.k8s_namespace.wait_for_get_gamma_route_deleted(name) - logger.debug("GRPCRoute %s deleted", name) + logger.debug("HTTPRoute %s deleted", name) - def _create_service(self, template, **kwargs) -> k8s.V1Service: - service = self._create_from_template(template, **kwargs) - if not isinstance(service, k8s.V1Service): - raise _RunnerError( - f"Expected V1Service to be created from manifest {template}" - ) - if service.metadata.name != kwargs["service_name"]: - raise _RunnerError( - "V1Service created with unexpected name: " - f"{service.metadata.name}" - ) - logger.debug( - "V1Service %s created at %s", - service.metadata.self_link, - service.metadata.creation_timestamp, - ) - return service + def _delete_session_affinity_policy(self, name, wait_for_deletion=True): + logger.info("Deleting GCPSessionAffinityPolicy %s", name) + try: + self.k8s_namespace.delete_session_affinity_policy(name) + except (retryers.RetryError, k8s.NotFound) as e: + logger.info("GCPSessionAffinityPolicy %s deletion failed: %s", name, e) + return + + if wait_for_deletion: + self.k8s_namespace.wait_for_get_session_affinity_policy_deleted(name) + logger.debug("GCPSessionAffinityPolicy %s deleted", name) + + def _delete_session_affinity_filter(self, name, wait_for_deletion=True): + logger.info("Deleting GCPSessionAffinityFilter %s", name) + try: + self.k8s_namespace.delete_session_affinity_filter(name) + except (retryers.RetryError, k8s.NotFound) as e: + logger.info("GCPSessionAffinityFilter %s deletion failed: %s", name, e) + return + + if wait_for_deletion: + self.k8s_namespace.wait_for_get_session_affinity_filter_deleted(name) + logger.debug("GCPSessionAffinityFilter %s deleted", name) + + def _delete_backend_policy(self, name, wait_for_deletion=True): + logger.info("Deleting GCPBackendPolicy %s", name) + try: + self.k8s_namespace.delete_backend_policy(name) + except (retryers.RetryError, k8s.NotFound) as e: + logger.info("GGCPBackendPolicy %s deletion failed: %s", name, e) + return + + if wait_for_deletion: + self.k8s_namespace.wait_for_get_backend_policy_deleted(name) + logger.debug("GCPBackendPolicy %s deleted", name) def _delete_deployment(self, name, wait_for_deletion=True): logger.info("Deleting deployment %s", name) @@ -756,3 +869,11 @@ def _make_namespace_name( if resource_suffix: parts.append(resource_suffix) return "-".join(parts) +# Copyright 2022 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_gamma_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_gamma_testcase.py index bf59421af3cf3..5c16e82c27c36 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_gamma_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_gamma_testcase.py @@ -96,7 +96,7 @@ def initKubernetesServerRunner(self) -> GammaServerRunner: k8s.KubernetesNamespace( self.k8s_api_manager, self.server_namespace ), - mesh_name=self.mesh_name, +# mesh_name=self.mesh_name, server_xds_host=self.server_xds_host, deployment_name=self.server_name, image_name=self.server_image, @@ -114,5 +114,6 @@ def startTestClient( self, test_server: XdsTestServer, **kwargs ) -> XdsTestClient: return super().startTestClient( - test_server, config_mesh=self.mesh_name_td + # test_server, config_mesh=self.mesh_name_td + test_server ) diff --git a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py index 6e460984ac23f..d71b551975254 100644 --- a/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py +++ b/tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py @@ -74,6 +74,8 @@ _ChannelState = grpc_channelz.ChannelState _timedelta = datetime.timedelta ClientConfig = grpc_csds.ClientConfig +RpcMetadata = grpc_testing.LoadBalancerStatsResponse.RpcMetadata +MetadataByPeer: list[str, RpcMetadata] # pylint complains about signal.Signals for some reason. _SignalNum = Union[int, signal.Signals] # pylint: disable=no-member _SignalHandler = Callable[[_SignalNum, Optional[FrameType]], Any] @@ -310,9 +312,23 @@ def removeServerBackends(self, *, server_runner=None): # Remove backends from the Backend Service self.td.backend_service_remove_neg_backends(neg_name, neg_zones) + def parseMetadata( + self, metadatas_by_peer: dict[str, "MetadataByPeer"] + ) -> dict[str]: + cookies = dict() + p = "" + for peer, metadatas in metadatas_by_peer.items(): + p = peer + for metadatas in metadatas.rpc_metadata: + for metadata in metadatas.metadata: + if metadata.key == "cookie": + cookies[peer] = metadata.value + return cookies + + def assertSuccessfulRpcs( self, test_client: XdsTestClient, num_rpcs: int = 100 - ): + ) -> dict[str]: lb_stats = self.getClientRpcStats(test_client, num_rpcs) self.assertAllBackendsReceivedRpcs(lb_stats) failed = int(lb_stats.num_failures) @@ -321,6 +337,9 @@ def assertSuccessfulRpcs( 0, msg=f"Expected all RPCs to succeed: {failed} of {num_rpcs} failed", ) + if lb_stats.metadatas_by_peer is None: + return None + return self.parseMetadata(lb_stats.metadatas_by_peer) @staticmethod def diffAccumulatedStatsPerMethod( diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/backend_policy.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/backend_policy.yaml new file mode 100644 index 0000000000000..872b4ef7689e7 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/backend_policy.yaml @@ -0,0 +1,17 @@ +--- +kind: GCPBackendPolicy +apiVersion: networking.gke.io/v1 +metadata: + name: ${be_policy_name} + namespace: ${namespace_name} + labels: + owner: xds-k8s-interop-test +spec: + targetRef: + group: "" + kind: Service + name: ${service_name} + default: + connectionDraining: + drainingTimeoutSec: 600 +... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http.yaml new file mode 100644 index 0000000000000..5ed56e629abd9 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http.yaml @@ -0,0 +1,25 @@ +--- +kind: HTTPRoute +apiVersion: gateway.networking.k8s.io/v1beta1 +metadata: + name: ${route_name} + namespace: ${namespace_name} + labels: + owner: xds-k8s-interop-test +spec: + parentRefs: + - name: ${mesh_name} + namespace: ${namespace_name} + group: net.gke.io + kind: TDMesh + hostnames: + - ${xds_server_uri} + rules: + - matches: + - path: + type: Exact + value: /grpc.testing.TestService/UnaryCall + backendRefs: + - name: ${service_name} + port: 50051 +... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http_3.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http_3.yaml new file mode 100644 index 0000000000000..3ec50909abdd9 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http_3.yaml @@ -0,0 +1,30 @@ +--- +kind: HTTPRoute +apiVersion: gateway.networking.k8s.io/v1beta1 +metadata: + name: ${route_name} + namespace: ${namespace_name} + labels: + owner: xds-k8s-interop-test +spec: + parentRefs: + - name: ${service_name_1} + kind: Service + rules: + - matches: + - path: + type: Exact + value: /grpc.testing.TestService/UnaryCall + backendRefs: + - name: ${service_name_2} + port: 50051 + - name: ${service_name_3} + port: 50051 + - matches: + - path: + type: Exact + value: /grpc.testing/TestService/EmptyCall + backendRefs: + - name: ${service_name_2} + port: 8080 +... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http_gamma.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http_gamma.yaml new file mode 100644 index 0000000000000..df758f8256ed3 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http_gamma.yaml @@ -0,0 +1,21 @@ +--- +kind: HTTPRoute +apiVersion: gateway.networking.k8s.io/v1alpha2 +metadata: + name: ${route_name} + namespace: ${namespace_name} + labels: + owner: xds-k8s-interop-test +spec: + parentRefs: + - name: ${service_name} + kind: Service + rules: + - matches: + - path: + type: Exact + value: /grpc.testing.TestService/UnaryCall + backendRefs: + - name: ${service_name} + port: 50051 +... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http_ssafilter.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http_ssafilter.yaml new file mode 100644 index 0000000000000..18e326a304395 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/route_http_ssafilter.yaml @@ -0,0 +1,27 @@ +--- +kind: HTTPRoute +apiVersion: gateway.networking.k8s.io/v1beta1 +metadata: + name: ${route_name} + namespace: ${namespace_name} + labels: + owner: xds-k8s-interop-test +spec: + parentRefs: + - name: ${service_name_1} + kind: Service + rules: + - matches: + - path: + type: Exact + value: /grpc.testing.TestService/UnaryCall + filters: + - type: ExtensionRef + extensionRef: + group: networking.gke.io + kind: GCPSessionAffinityFilter + name: ${session_affinity_filter_name} + backendRefs: + - name: ${service_name_2} + port: 50051 +... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_filter.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_filter.yaml new file mode 100644 index 0000000000000..e0370561dd4e5 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_filter.yaml @@ -0,0 +1,10 @@ +--- +apiVersion: networking.gke.io/v1 +kind: GCPSessionAffinityFilter +metadata: + name: ${session_affinity_filter_name} + namespace: ${namespace_name} +spec: + statefulGeneratedCookie: + cookieTtlSeconds: 50 +... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_policy_route.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_policy_route.yaml new file mode 100644 index 0000000000000..7042b54bd51bc --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_policy_route.yaml @@ -0,0 +1,15 @@ +--- +apiVersion: networking.gke.io/v1 +kind: GCPSessionAffinityPolicy +metadata: + name: ${session_affinity_policy_name} + namespace: ${namespace_name} +spec: + statefulGeneratedCookie: + cookieTtlSeconds: 50 + targetRef: + name: ${route_name} + group: gateway.networking.k8s.io + kind: HTTPRoute + namespace: ${namespace_name} +... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_policy_service.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_policy_service.yaml new file mode 100644 index 0000000000000..ba94779d24882 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/session_affinity_policy_service.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: networking.gke.io/v1 +kind: GCPSessionAffinityPolicy +metadata: + name: ${session_affinity_policy_name} + namespace: ${namespace_name} +spec: + statefulGeneratedCookie: + cookieTtlSeconds: 50 + targetRef: + name: ${service_name} + kind: Service + namespace: ${namespace_name} +... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/tdmesh.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/tdmesh.yaml index 823014ae66d29..19a591805e0f1 100644 --- a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/tdmesh.yaml +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/gamma/tdmesh.yaml @@ -8,12 +8,4 @@ metadata: owner: xds-k8s-interop-test spec: gatewayClassName: gke-td - allowedRoutes: - namespaces: - from: All - kinds: - - group: net.gke.io - # This is intentionally incorrect and should be set to GRPCRoute. - # TODO(sergiitk): [GAMMA] Change when the fix is ready. - kind: TDGRPCRoute ... diff --git a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.deployment.yaml b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.deployment.yaml index 0d6501a2daae4..15d7585b72214 100644 --- a/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.deployment.yaml +++ b/tools/run_tests/xds_k8s_test_driver/kubernetes-manifests/server.deployment.yaml @@ -55,6 +55,7 @@ spec: requests: cpu: 100m memory: 512Mi + terminationGracePeriodSeconds: 600 initContainers: - name: grpc-td-init image: ${td_bootstrap_image} diff --git a/tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py b/tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py index bf1bcf31acc44..deb82da105a83 100644 --- a/tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py +++ b/tools/run_tests/xds_k8s_test_driver/tests/baseline_test.py @@ -17,6 +17,7 @@ from absl.testing import absltest from framework import xds_k8s_testcase +from framework import xds_url_map_testcase logger = logging.getLogger(__name__) flags.adopt_module_key_flags(xds_k8s_testcase) @@ -24,7 +25,7 @@ # Type aliases _XdsTestServer = xds_k8s_testcase.XdsTestServer _XdsTestClient = xds_k8s_testcase.XdsTestClient - +RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall class BaselineTest(xds_k8s_testcase.RegularXdsKubernetesTestCase): def test_traffic_director_grpc_setup(self): @@ -45,7 +46,7 @@ def test_traffic_director_grpc_setup(self): with self.subTest("5_start_test_server"): test_server: _XdsTestServer = self.startTestServers()[0] - +# with self.subTest("6_add_server_backends_to_backend_service"): self.setupServerBackends() @@ -58,6 +59,5 @@ def test_traffic_director_grpc_setup(self): with self.subTest("9_test_server_received_rpcs_from_test_client"): self.assertSuccessfulRpcs(test_client) - if __name__ == "__main__": absltest.main(failfast=True) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_failover_test.py b/tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_failover_test.py new file mode 100644 index 0000000000000..0b035c4b0a114 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_failover_test.py @@ -0,0 +1,93 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from collections import namedtuple + +from absl import flags +from absl.testing import absltest + +from framework import xds_gamma_testcase +from framework import xds_k8s_testcase +from framework import xds_url_map_testcase + +logger = logging.getLogger(__name__) +flags.adopt_module_key_flags(xds_k8s_testcase) + +_XdsTestServer = xds_k8s_testcase.XdsTestServer +_XdsTestClient = xds_k8s_testcase.XdsTestClient +RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall + +class AffinityTest(xds_gamma_testcase.GammaXdsKubernetesTestCase): + def test_ping_pong(self): + REPLICA_COUNT = 3 + + test_servers: List[_XdsTestServer] + with self.subTest("01_run_test_server"): + test_servers = self.startTestServers(replica_count=REPLICA_COUNT) + + with self.subTest("02_create_ssa_policy"): + self.server_runner.createSessionAffinityPolicy("route") + + with self.subTest("03_create_backend_policy"): + self.server_runner.createBackendPolicy() + + # Default is round robin LB policy. + + with self.subTest("04_start_test_client"): + test_client: _XdsTestClient = self.startTestClient(test_servers[0]) + + cookie = "" + hostname = "" + chosenServerIdx = None + with self.subTest("04_send_first_RPC_and_retrieve_cookie"): + cookies = self.assertSuccessfulRpcs(test_client, 1) + print(cookies) + hostname = next(iter(cookies)) + cookie = cookies[hostname] + for idx, server in enumerate(test_servers): + if server.hostname == hostname: + chosenServerIdx = idx + break + + with self.subTest("05_send_RPCs_with_cookie"): + test_client.update_config.configure( + rpc_types=(RpcTypeUnaryCall,), + metadata=( + ( + RpcTypeUnaryCall, + "cookie", + cookie, + ), + ), + ) + self.assertRpcsEventuallyGoToGivenServers( + test_client, test_servers[chosenServerIdx:chosenServerIdx+1], 10 + ) + + with self.subTest("06_set_server_to_unhealthy"): + test_servers[chosenServerIdx].set_not_serving() + healthy_servers = test_servers; + healthy_servers.pop(chosenServerIdx); + self.assertRpcsEventuallyGoToGivenServers( + test_client, healthy_servers, 10 + ) + + with self.subTest("07_set_server_back_to_healthy"): + test_servers[chosenServerIdx].set_serving() + self.assertRpcsEventuallyGoToGivenServers( + test_client, test_servers[chosenServerIdx:chosenServerIdx+1], 10 + ) + +if __name__ == "__main__": + absltest.main(failfast=True) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_filter_test.py b/tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_filter_test.py new file mode 100644 index 0000000000000..662191e21cf97 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_filter_test.py @@ -0,0 +1,75 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from absl import flags +from absl.testing import absltest + +from framework import xds_gamma_testcase +from framework import xds_k8s_testcase +from framework import xds_url_map_testcase + +logger = logging.getLogger(__name__) +flags.adopt_module_key_flags(xds_k8s_testcase) + +_XdsTestServer = xds_k8s_testcase.XdsTestServer +_XdsTestClient = xds_k8s_testcase.XdsTestClient +RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall + +class AffinityTest(xds_gamma_testcase.GammaXdsKubernetesTestCase): + def test_ping_pong(self): + REPLICA_COUNT = 3 + + test_servers: List[_XdsTestServer] + with self.subTest("01_run_test_server"): + test_servers = self.startTestServers(replica_count=REPLICA_COUNT) + + with self.subTest("02_create_ssa_filter"): + self.server_runner.createSessionAffinityFilter() + + # Default is round robin LB policy. + + with self.subTest("03_start_test_client"): + test_client: _XdsTestClient = self.startTestClient(test_servers[0]) + + cookie = "" + hostname = "" + chosenServerIdx = None + with self.subTest("04_send_first_RPC_and_retrieve_cookie"): + cookies = self.assertSuccessfulRpcs(test_client, 1) + print(cookies) + hostname = next(iter(cookies)) + cookie = cookies[hostname] + for idx, server in enumerate(test_servers): + if server.hostname == hostname: + chosenServerIdx = idx + break + + with self.subTest("05_send_RPCs_with_cookie"): + test_client.update_config.configure( + rpc_types=(RpcTypeUnaryCall,), + metadata=( + ( + RpcTypeUnaryCall, + "cookie", + cookie, + ), + ), + ) + self.assertRpcsEventuallyGoToGivenServers( + test_client, test_servers[chosenServerIdx:chosenServerIdx+1], 10 + ) + +if __name__ == "__main__": + absltest.main(failfast=True) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_policy_target_route_test.py b/tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_policy_target_route_test.py new file mode 100644 index 0000000000000..96af183d01552 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_policy_target_route_test.py @@ -0,0 +1,75 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from absl import flags +from absl.testing import absltest + +from framework import xds_gamma_testcase +from framework import xds_k8s_testcase +from framework import xds_url_map_testcase + +logger = logging.getLogger(__name__) +flags.adopt_module_key_flags(xds_k8s_testcase) + +_XdsTestServer = xds_k8s_testcase.XdsTestServer +_XdsTestClient = xds_k8s_testcase.XdsTestClient +RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall + +class AffinityTest(xds_gamma_testcase.GammaXdsKubernetesTestCase): + def test_ping_pong(self): + REPLICA_COUNT = 3 + + test_servers: List[_XdsTestServer] + with self.subTest("01_run_test_server"): + test_servers = self.startTestServers(replica_count=REPLICA_COUNT) + + with self.subTest("02_create_ssa_policy"): + self.server_runner.createSessionAffinityPolicy("route") + + # Default is round robin LB policy. + + with self.subTest("03_start_test_client"): + test_client: _XdsTestClient = self.startTestClient(test_servers[0]) + + cookie = "" + hostname = "" + chosenServerIdx = None + with self.subTest("04_send_first_RPC_and_retrieve_cookie"): + cookies = self.assertSuccessfulRpcs(test_client, 1) + print(cookies) + hostname = next(iter(cookies)) + cookie = cookies[hostname] + for idx, server in enumerate(test_servers): + if server.hostname == hostname: + chosenServerIdx = idx + break + + with self.subTest("05_send_RPCs_with_cookie"): + test_client.update_config.configure( + rpc_types=(RpcTypeUnaryCall,), + metadata=( + ( + RpcTypeUnaryCall, + "cookie", + cookie, + ), + ), + ) + self.assertRpcsEventuallyGoToGivenServers( + test_client, test_servers[chosenServerIdx:chosenServerIdx+1], 10 + ) + +if __name__ == "__main__": + absltest.main(failfast=True) diff --git a/tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_policy_target_service_test.py b/tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_policy_target_service_test.py new file mode 100644 index 0000000000000..521c68aae1046 --- /dev/null +++ b/tools/run_tests/xds_k8s_test_driver/tests/gamma/affinity_policy_target_service_test.py @@ -0,0 +1,76 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from collections import namedtuple + +from absl import flags +from absl.testing import absltest + +from framework import xds_gamma_testcase +from framework import xds_k8s_testcase +from framework import xds_url_map_testcase + +logger = logging.getLogger(__name__) +flags.adopt_module_key_flags(xds_k8s_testcase) + +_XdsTestServer = xds_k8s_testcase.XdsTestServer +_XdsTestClient = xds_k8s_testcase.XdsTestClient +RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall + +class AffinityTest(xds_gamma_testcase.GammaXdsKubernetesTestCase): + def test_ping_pong(self): + REPLICA_COUNT = 3 + + test_servers: List[_XdsTestServer] + with self.subTest("01_run_test_server"): + test_servers = self.startTestServers(replica_count=REPLICA_COUNT) + + with self.subTest("02_create_ssa_policy"): + self.server_runner.createSessionAffinityPolicy("service") + + # Default is round robin LB policy. + + with self.subTest("03_start_test_client"): + test_client: _XdsTestClient = self.startTestClient(test_servers[0]) + + cookie = "" + hostname = "" + chosenServerIdx = None + with self.subTest("04_send_first_RPC_and_retrieve_cookie"): + cookies = self.assertSuccessfulRpcs(test_client, 1) + print(cookies) + hostname = next(iter(cookies)) + cookie = cookies[hostname] + for idx, server in enumerate(test_servers): + if server.hostname == hostname: + chosenServerIdx = idx + break + + with self.subTest("05_send_RPCs_with_cookie"): + test_client.update_config.configure( + rpc_types=(RpcTypeUnaryCall,), + metadata=( + ( + RpcTypeUnaryCall, + "cookie", + cookie, + ), + ), + ) + self.assertRpcsEventuallyGoToGivenServers( + test_client, test_servers[chosenServerIdx:chosenServerIdx+1], 10 + ) + +if __name__ == "__main__": + absltest.main(failfast=True)