Skip to content

Commit

Permalink
GAMMA and Stateful Session Affinity tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ginayeh committed Sep 14, 2023
1 parent aaf6c34 commit 0f859af
Show file tree
Hide file tree
Showing 27 changed files with 901 additions and 52 deletions.
3 changes: 2 additions & 1 deletion tools/run_tests/run_xds_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ def parse_port_range(port_arg):
_SPONGE_XML_NAME = "sponge_log.xml"


def get_client_stats(num_rpcs, timeout_sec):
def get_client_stats(num_rpcs, timeout_sec, metadata):
if CLIENT_HOSTS:
hosts = CLIENT_HOSTS
else:
Expand All @@ -440,6 +440,7 @@ def get_client_stats(num_rpcs, timeout_sec):
request = messages_pb2.LoadBalancerStatsRequest()
request.num_rpcs = num_rpcs
request.timeout_sec = timeout_sec
request.metadata = metadata
rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC
logger.debug(
"Invoking GetClientStats RPC to %s:%d:", host, args.stats_port
Expand Down
4 changes: 1 addition & 3 deletions tools/run_tests/xds_k8s_test_driver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ sudo apt-get install python3-venv
##### Getting Started

1. If you haven't, [initialize](https://cloud.google.com/sdk/docs/install-sdk) gcloud SDK
2. Activate gcloud [configuration](https://cloud.google.com/sdk/docs/configurations) with your project
2. Activate gcloud [configuration](https://cloud.google.com/sdk/docs/configurations) with your project
3. Enable gcloud services:
```shell
gcloud services enable \
Expand All @@ -54,7 +54,6 @@ sudo apt-get install python3-venv

#### Configure GKE cluster
This is an example outlining minimal requirements to run the [baseline tests](xds-baseline-tests).

Update gloud sdk:
```shell
gcloud -q components update
Expand Down Expand Up @@ -91,7 +90,6 @@ gcloud container clusters create "${CLUSTER_NAME}" \
--workload-metadata=GKE_METADATA \
--tags=allow-health-checks
```

For security tests you also need to create CAs and configure the cluster to use those CAs
as described
[here](https://cloud.google.com/traffic-director/docs/security-proxyless-setup#configure-cas).
Expand Down
20 changes: 18 additions & 2 deletions tools/run_tests/xds_k8s_test_driver/framework/helpers/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

# Type aliases
RpcsByPeer: Dict[str, int]

RpcMetadata = grpc_testing.LoadBalancerStatsResponse.RpcMetadata
MetadataByPeer: list[str, RpcMetadata]

@functools.cache # pylint: disable=no-member
def status_from_int(grpc_status_int: int) -> Optional[grpc.StatusCode]:
Expand Down Expand Up @@ -131,6 +132,8 @@ class PrettyLoadBalancerStats:
# }
rpcs_by_method: Dict[str, "RpcsByPeer"]

metadatas_by_peer: Dict[str, "MetadataByPeer"]

@staticmethod
def _parse_rpcs_by_peer(
rpcs_by_peer: grpc_testing.RpcsByPeer,
Expand All @@ -140,6 +143,19 @@ def _parse_rpcs_by_peer(
result[peer] = count
return result

@staticmethod
def _parse_metadatas_by_peer(
metadatas_by_peer: grpc_testing.LoadBalancerStatsResponse.MetadataByPeer,
) -> "MetadataByPeer":
result = dict()
for peer, metadatas in metadatas_by_peer.items():
pretty_metadata = ""
for metadatas in metadatas.rpc_metadata:
for metadata in metadatas.metadata:
pretty_metadata += metadata.key + ": " + metadata.value + ", "
result[peer] = pretty_metadata
return result

@classmethod
def from_response(
cls, lb_stats: grpc_testing.LoadBalancerStatsResponse
Expand All @@ -154,9 +170,9 @@ def from_response(
num_failures=lb_stats.num_failures,
rpcs_by_peer=cls._parse_rpcs_by_peer(lb_stats.rpcs_by_peer),
rpcs_by_method=rpcs_by_method,
metadatas_by_peer=cls._parse_metadatas_by_peer(lb_stats.metadatas_by_peer),
)


def lb_stats_pretty(lb: grpc_testing.LoadBalancerStatsResponse) -> str:
"""Pretty print LoadBalancerStatsResponse.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class BackendServiceProtocol(enum.Enum):
HTTP2 = enum.auto()
GRPC = enum.auto()

class SessionAffinityProtocol(enum.Enum):
HTTP = enum.auto()

def create_health_check(
self,
name: str,
Expand Down
139 changes: 136 additions & 3 deletions tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
DynResourceInstance = dynamic_res.ResourceInstance
GammaMesh = DynResourceInstance
GammaGrpcRoute = DynResourceInstance
GammaHttpRoute = DynResourceInstance
GcpSessionAffinityPolicy = DynResourceInstance
GcpSessionAffinityFilter = DynResourceInstance
GcpBackendPolicy = DynResourceInstance

_timedelta = datetime.timedelta
_ApiException = client.ApiException
Expand Down Expand Up @@ -161,6 +165,18 @@ def grpc_route(self, version: str) -> dynamic_res.Resource:

return self._load_dynamic_api(api_name, version, kind)

@functools.cache # pylint: disable=no-member
def http_route(self, version: str) -> dynamic_res.Resource:
api_name = "gateway.networking.k8s.io"
kind = "HTTPRoute"
supported_versions = {"v1beta1", "v1alpha2"}
if version not in supported_versions:
raise NotImplementedError(
f"{kind} {api_name}/{version} not implemented."
)

return self._load_dynamic_api(api_name, version, kind)

def close(self):
# TODO(sergiitk): [GAMMA] what to do with dynamic clients?
self.client.close()
Expand Down Expand Up @@ -243,6 +259,34 @@ def api_grpc_route(self) -> dynamic_res.Resource:
"GRPCRoute",
)

@functools.cached_property # pylint: disable=no-member
def api_http_route(self) -> dynamic_res.Resource:
return self._get_dynamic_api(
"gateway.networking.k8s.io/v1alpha2",
"HTTPRoute",
)

@functools.cached_property # pylint: disable=no-member
def api_session_affinity_policy(self) -> dynamic_res.Resource:
return self._get_dynamic_api(
"networking.gke.io/v1",
"GCPSessionAffinityPolicy",
)

@functools.cached_property # pylint: disable=no-member
def api_session_affinity_filter(self) -> dynamic_res.Resource:
return self._get_dynamic_api(
"networking.gke.io/v1",
"GCPSessionAffinityFilter",
)

@functools.cached_property # pylint: disable=no-member
def api_backend_policy(self) -> dynamic_res.Resource:
return self._get_dynamic_api(
"networking.gke.io/v1",
"GCPBackendPolicy",
)

def _refresh_auth(self):
logger.info("Reloading k8s api client to refresh the auth.")
self._api.reload()
Expand Down Expand Up @@ -276,6 +320,8 @@ def _get_dynamic_api(self, api_version, kind) -> dynamic_res.Resource:
elif group == "gateway.networking.k8s.io":
if kind == "GRPCRoute":
return self._api.grpc_route(version)
elif kind == "HTTPRoute":
return self._api.http_route(version)

raise NotImplementedError(f"{kind} {api_version} not implemented.")

Expand Down Expand Up @@ -445,8 +491,17 @@ def get_service(self, name) -> V1Service:
def get_gamma_mesh(self, name) -> Optional[GammaMesh]:
return self._get_dyn_resource(self.api_gke_mesh, name)

def get_gamma_route(self, name) -> Optional[GammaGrpcRoute]:
return self._get_dyn_resource(self.api_grpc_route, name)
def get_gamma_route(self, name) -> Optional[GammaHttpRoute]:
return self._get_dyn_resource(self.api_http_route, name)

def get_session_affinity_policy(self, name) -> Optional[GcpSessionAffinityPolicy]:
return self._get_dyn_resource(self.api_session_affinity_policy, name)

def get_session_affinity_filter(self, name) -> Optional[GcpSessionAffinityFilter]:
return self._get_dyn_resource(self.api_session_affinity_filter, name)

def get_backend_policy(self, name) -> Optional[GcpBackendPolicy]:
return self._get_dyn_resource(self.api_backend_policy, name)

def get_service_account(self, name) -> V1Service:
return self._get_resource(
Expand Down Expand Up @@ -502,7 +557,46 @@ def delete_gamma_route(
# TODO(sergiitk): [GAMMA] Can we call delete on dynamic_res.ResourceList
# to avoid no-member issues due to dynamic_res.Resource proxying calls?
self._execute(
self.api_grpc_route.delete, # pylint: disable=no-member
self.api_http_route.delete, # pylint: disable=no-member
name=name,
namespace=self.name,
propagation_policy="Foreground",
grace_period_seconds=grace_period_seconds,
)

def delete_session_affinity_policy(
self,
name: str,
grace_period_seconds=DELETE_GRACE_PERIOD_SEC,
) -> None:
self._execute(
self.api_session_affinity_policy.delete,
name=name,
namespace=self.name,
propagation_policy="Foreground",
grace_period_seconds=grace_period_seconds,
)

def delete_session_affinity_filter(
self,
name: str,
grace_period_seconds=DELETE_GRACE_PERIOD_SEC,
) -> None:
self._execute(
self.api_session_affinity_filter.delete,
name=name,
namespace=self.name,
propagation_policy="Foreground",
grace_period_seconds=grace_period_seconds,
)

def delete_backend_policy(
self,
name: str,
grace_period_seconds=DELETE_GRACE_PERIOD_SEC,
) -> None:
self._execute(
self.api_backend_policy.delete,
name=name,
namespace=self.name,
propagation_policy="Foreground",
Expand Down Expand Up @@ -561,6 +655,45 @@ def wait_for_get_gamma_route_deleted(
)
retryer(self.get_gamma_route, name)

def wait_for_get_session_affinity_policy_deleted(
self,
name: str,
timeout_sec: int = WAIT_MEDIUM_TIMEOUT_SEC,
wait_sec: int = WAIT_MEDIUM_SLEEP_SEC,
) -> None:
retryer = retryers.constant_retryer(
wait_fixed=_timedelta(seconds=wait_sec),
timeout=_timedelta(seconds=timeout_sec),
check_result=lambda policy: policy is None,
)
retryer(self.get_session_affinity_policy, name)

def wait_for_get_session_affinity_filter_deleted(
self,
name: str,
timeout_sec: int = WAIT_MEDIUM_TIMEOUT_SEC,
wait_sec: int = WAIT_MEDIUM_SLEEP_SEC,
) -> None:
retryer = retryers.constant_retryer(
wait_fixed=_timedelta(seconds=wait_sec),
timeout=_timedelta(seconds=timeout_sec),
check_result=lambda policy: policy is None,
)
retryer(self.get_session_affinity_filter, name)

def wait_for_get_backend_policy_deleted(
self,
name: str,
timeout_sec: int = WAIT_MEDIUM_TIMEOUT_SEC,
wait_sec: int = WAIT_MEDIUM_SLEEP_SEC,
) -> None:
retryer = retryers.constant_retryer(
wait_fixed=_timedelta(seconds=wait_sec),
timeout=_timedelta(seconds=timeout_sec),
check_result=lambda policy: policy is None,
)
retryer(self.get_backend_policy, name)

def wait_for_service_account_deleted(
self,
name: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
_ComputeV1 = gcp.compute.ComputeV1
GcpResource = _ComputeV1.GcpResource
HealthCheckProtocol = _ComputeV1.HealthCheckProtocol
SessionAffinityProtocol = _ComputeV1.SessionAffinityProtocol
ZonalGcpResource = _ComputeV1.ZonalGcpResource
BackendServiceProtocol = _ComputeV1.BackendServiceProtocol
_BackendGRPC = BackendServiceProtocol.GRPC
_HealthCheckGRPC = HealthCheckProtocol.GRPC
_SessionAffinityHTTP = SessionAffinityProtocol.HTTP

# Network Security
_NetworkSecurityV1Beta1 = gcp.network_security.NetworkSecurityV1Beta1
Expand Down Expand Up @@ -65,6 +67,7 @@ class TrafficDirectorManager: # pylint: disable=too-many-public-methods
FORWARDING_RULE_NAME = "forwarding-rule"
ALTERNATIVE_FORWARDING_RULE_NAME = "forwarding-rule-alt"
FIREWALL_RULE_NAME = "allow-health-checks"
SESSION_AFFINITY_NAME = "session-affinity-policy"

def __init__(
self,
Expand Down Expand Up @@ -199,6 +202,34 @@ def delete_health_check(self, force=False):
self.compute.delete_health_check(name)
self.health_check = None

def create_session_affinity_policy(
self,
protocol: Optional[SessionAffinityProtocol] = _SessionAffinityHTTP,
):
if self.session_affinity_policy:
raise ValueError(
f"Session affinity policy {self.session_affinity_policy.name} "
"already created, delete it first"
)
if protocol is None:
protocol = _SessionAffinityHTTP

name = self.make_resource_name(self.SESSION_AFFINITY_NAME)
logger.info('Creating %s Session Affinity Policy "%s"', protocol.name, name)
resource = self.compute.create_session_affinity_policy(name, protocol)
self.session_affinity_policy = resource

def delete_session_affinity_policy(self, force=False):
if force:
name = self.make_resource_name(self.SESSION_AFFINITY_NAME)
elif self.session_affinity_policy:
name = self.session_affinity_policy.name
else:
return
logger.info('Deleting Session Affinity Policy "%s"', name)
self.compute.delete_session_affinity_policy(name)
self.session_affinity_policy = None

def create_backend_service(
self,
protocol: Optional[BackendServiceProtocol] = _BackendGRPC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,17 @@ def get_client_stats(
*,
num_rpcs: int,
timeout_sec: Optional[int] = STATS_PARTIAL_RESULTS_TIMEOUT_SEC,
metadata_keys: list[str] = None,
) -> LoadBalancerStatsResponse:
if timeout_sec is None:
timeout_sec = self.STATS_PARTIAL_RESULTS_TIMEOUT_SEC

return self.call_unary_with_deadline(
rpc="GetClientStats",
req=_LoadBalancerStatsRequest(
num_rpcs=num_rpcs, timeout_sec=timeout_sec
num_rpcs=num_rpcs,
timeout_sec=timeout_sec,
metadata_keys=metadata_keys,
),
deadline_sec=timeout_sec,
log_level=logging.INFO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,11 @@ def get_load_balancer_stats(
"""
Shortcut to LoadBalancerStatsServiceClient.get_client_stats()
"""
logger.info("[gina] client_app.py get_load_balancer_stats")
return self.load_balancer_stats.get_client_stats(
num_rpcs=num_rpcs, timeout_sec=timeout_sec
num_rpcs=num_rpcs,
timeout_sec=timeout_sec,
metadata_keys="*",
)

def get_load_balancer_accumulated_stats(
Expand Down
Loading

0 comments on commit 0f859af

Please sign in to comment.