Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subject access review tests #547

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion testsuite/kuadrant/policy/authorization/sections.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def add_external_opa_policy(self, name, endpoint, ttl=0, **common_features):
self.add_item(name, {"opa": {"externalPolicy": {"url": endpoint, "ttl": ttl}}}, **common_features)

@modify
def add_kubernetes(self, name: str, user: ABCValue, resource_attributes: dict, **common_features):
def add_kubernetes(self, name: str, user: ABCValue, resource_attributes: dict = None, **common_features):
"""Adds Kubernetes authorization

:param name: name of kubernetes authorization
Expand Down
59 changes: 59 additions & 0 deletions testsuite/kubernetes/cluster_role.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""ClusterRole and ClusterRoleBinding objects for Kubernetes"""

from typing import Any
from testsuite.kubernetes import KubernetesObject


class ClusterRole(KubernetesObject):
"""Kubernetes ClusterRole"""

@classmethod
def create_instance(
cls,
cluster,
name,
rules: list[dict[str, Any]] = None,
labels: dict[str, str] = None,
):
"""Creates a new ClusterRole instance"""
model: dict = {
"kind": "ClusterRole",
"apiVersion": "rbac.authorization.k8s.io/v1",
"metadata": {
"name": name,
"labels": labels,
},
"rules": rules,
}
return cls(model, context=cluster.context)


class ClusterRoleBinding(KubernetesObject):
"""Kubernetes ClusterRoleBinding"""

@classmethod
def create_instance(
cls,
cluster,
name,
cluster_role: str,
serviceaccounts: list[str],
labels: dict[str, str] = None,
):
"""Creates a new ClusterRoleBinding object"""
model: dict = {
"kind": "ClusterRoleBinding",
"apiVersion": "rbac.authorization.k8s.io/v1",
"metadata": {
"name": name,
"labels": labels,
},
"roleRef": {
"kind": "ClusterRole",
"name": cluster_role,
},
"subjects": [
{"kind": "ServiceAccount", "name": name, "namespace": cluster.project} for name in serviceaccounts
],
}
return cls(model, context=cluster.context)
15 changes: 15 additions & 0 deletions testsuite/tests/singlecluster/authorino/identity/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,21 @@

import pytest

from testsuite.kubernetes.service_account import ServiceAccount


@pytest.fixture(scope="module")
def create_service_account(request, cluster, blame, module_label):
"""Creates and returns service account"""

def _create_service_account(name):
service_account = ServiceAccount.create_instance(cluster, blame(name), labels={"app": module_label})
request.addfinalizer(service_account.delete)
service_account.commit()
return service_account

return _create_service_account


@pytest.fixture(scope="module")
def authorization(authorization):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Conftest for SubjectAccessReview related tests."""

import pytest

from testsuite.httpx.auth import HeaderApiKeyAuth
from testsuite.kubernetes.cluster_role import ClusterRole, ClusterRoleBinding


@pytest.fixture(scope="module")
def cluster_role(request, cluster, blame, module_label):
"""Creates and returns a ClusterRole"""
rules = [{"nonResourceURLs": ["/get"], "verbs": ["get"]}]
cluster_role = ClusterRole.create_instance(cluster, blame("cr"), rules, labels={"app": module_label})
request.addfinalizer(cluster_role.delete)
cluster_role.commit()
return cluster_role


@pytest.fixture(scope="module")
def create_cluster_role_binding(request, cluster, blame, module_label):
"""Creates and returns a ClusterRoleBinding"""

def _create_cluster_role_binding(cluster_role, service_accounts):
cluster_role_binding = ClusterRoleBinding.create_instance(
cluster, blame("crb"), cluster_role, service_accounts, labels={"app": module_label}
)
request.addfinalizer(cluster_role_binding.delete)
cluster_role_binding.commit()
return cluster_role_binding

return _create_cluster_role_binding


@pytest.fixture(scope="module")
def bound_service_account_token(cluster_role, create_service_account, create_cluster_role_binding, audience):
"""Create a ServiceAccount, bind it to a ClusterRole and return its token with a given audience"""
service_account = create_service_account("tkn-auth")
create_cluster_role_binding(cluster_role.model.metadata.name, [service_account.model.metadata.name])
return service_account.get_auth_token(audience)
averevki marked this conversation as resolved.
Show resolved Hide resolved


@pytest.fixture(scope="module")
def auth(bound_service_account_token):
"""Create request auth with service account token as API key"""
return HeaderApiKeyAuth(bound_service_account_token, "Bearer")
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Test kubernetes SubjectAccessReview authorization by verifying only a
ServiceAccount bound to a ClusterRole is authorized to access a resource"""

import pytest

from testsuite.httpx.auth import HeaderApiKeyAuth
from testsuite.kuadrant.policy.authorization import ValueFrom

pytestmark = [pytest.mark.authorino]


@pytest.fixture(scope="module")
def authorization(authorization):
"""Add kubernetes token-review and subject-access-review identity"""
authorization.identity.add_kubernetes("token-review-host")
user = ValueFrom("auth.identity.user.username")
authorization.authorization.add_kubernetes("subject-access-review-host", user)
return authorization


@pytest.fixture(scope="module")
def audience(hostname):
"""Return hostname as only audience for the service account bound token"""
return [hostname.hostname]


@pytest.fixture(scope="module")
def service_account_token(create_service_account, audience):
"""Create a non-authorized service account and request its bound token with the hostname as audience"""
service_account = create_service_account("tkn-non-auth")
return service_account.get_auth_token(audience)


@pytest.fixture(scope="module")
def auth2(service_account_token):
"""Create request auth with service account token as API key"""
return HeaderApiKeyAuth(service_account_token, "Bearer")


def test_host_audience(client, auth, auth2):
"""Test Kubernetes SubjectAccessReview functionality by setting up authentication and authorization for an endpoint
and querying it with non-authorized and authorized ServiceAccount."""
response = client.get("/get", auth=auth2)
assert response.status_code == 403

response = client.get("/get", auth=auth)
assert response.status_code == 200
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,6 @@
import pytest

from testsuite.httpx.auth import HeaderApiKeyAuth
from testsuite.kubernetes.service_account import ServiceAccount


@pytest.fixture(scope="module")
def create_service_account(request, cluster, blame, module_label):
"""Creates and returns service account"""

def _create_service_account(name):
service_account = ServiceAccount.create_instance(cluster, blame(name), labels={"app": module_label})
request.addfinalizer(service_account.delete)
service_account.commit()
return service_account

return _create_service_account


@pytest.fixture(scope="module")
Expand Down
Loading