Skip to content

Commit

Permalink
New Auditor: Keycloak version check (#41)
Browse files Browse the repository at this point in the history
* Add auditor to check the Keycloak version

Signed-off-by: Tim Walter <[email protected]>

* Fix wrong return types

Signed-off-by: Tim Walter <[email protected]>

* Use the generic parent class for all realm auditors

Signed-off-by: Tim Walter <[email protected]>

* Simplify setting up the auditor that should be tested

Signed-off-by: Tim Walter <[email protected]>

---------

Signed-off-by: Tim Walter <[email protected]>
  • Loading branch information
twwd authored Oct 2, 2024
1 parent 7fadaa1 commit 853b192
Show file tree
Hide file tree
Showing 17 changed files with 231 additions and 76 deletions.
17 changes: 16 additions & 1 deletion docs/auditors/realm.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ title: Realm
---

# Realm Misconfigurations
These auditors check the realm-wide settings, like token lifetimes and global security features.

These auditors check the realm-wide settings, like token lifetimes and global security features.

## RefreshTokensShouldBeRevokedAfterUse

Expand Down Expand Up @@ -61,3 +62,17 @@ It may also compromise the integrity of user data, especially in applications wh

Realms detected with email verification turned off are highlighted for administrators to reassess this configuration choice.
Depending on the application's requirements and the level of trust needed in user-provided email addresses, enabling email verification may be advisable to enhance security and ensure the credibility of user accounts.

## KeycloakVersionShouldBeUpToDate

!!! warning

This auditor only checks for an exact match with the latest version. For a list of the actual vulnerabilities, use a vulnerabilty scanner like _trivy_.

This auditor examines if the used Keycloak version is up to date.
Only the latest version of the Keycloak retrieves security fixes, and thus it is crucial to keep up with the latest version.
kcwarden performs only a basic check for an exact match between the version of the analyzed Keycloak and the latest release.
For this check, it fetches the latest version from the GitHub releases of Keycloak.

If a RedHat version of Keycloak is used, it might have received backports.
In this case, the severity is lowered and the long description includes a corresponding hint.
20 changes: 20 additions & 0 deletions kcwarden/auditors/realm/abstract_realm_auditor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import abc
from typing import Generator

from kcwarden.api import Auditor
from kcwarden.custom_types.keycloak_object import Realm
from kcwarden.custom_types.result import Result


class AbstractRealmAuditor(Auditor, abc.ABC):
def should_consider_realm(self, realm: Realm) -> bool:
return self.is_not_ignored(realm)

def audit(self) -> Generator[Result, None, None]:
for realm in self._DB.get_all_realms():
if self.should_consider_realm(realm):
yield from self.audit_realm(realm)

@abc.abstractmethod
def audit_realm(self, realm: Realm) -> Generator[Result, None, None]:
raise NotImplementedError()
34 changes: 34 additions & 0 deletions kcwarden/auditors/realm/keycloak_version_should_be_up_to_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from kcwarden.auditors.realm.abstract_realm_auditor import AbstractRealmAuditor
from kcwarden.custom_types.result import Severity
from kcwarden.utils.github import get_latest_keycloak_version


class KeycloakVersionShouldBeUpToDate(AbstractRealmAuditor):
DEFAULT_SEVERITY = Severity.Medium
SHORT_DESCRIPTION = "Keycloak version should be up-to-date"
LONG_DESCRIPTION = "Only the latest version of Keycloak retrieves security fixes. The used version seems to be outdated and requires an update. Use a vulnerability scanner for a list of the actual vulnerabilities."
REFERENCE = ""

def audit_realm(self, realm):
current_version = realm.get_keycloak_version()
latest_version = get_latest_keycloak_version()
# We use a rudimentary check here and do not perform a comparison based on semantic versioning, etc.
is_outdated = current_version != latest_version
# Special handling for the RedHat SSO or RedHat build of Keycloak
is_redhat = "redhat" in current_version
if is_outdated:
yield self.generate_finding(
realm,
additional_details={
"current_version": current_version,
"latest_version": latest_version if latest_version is not None else "Could not be determined.",
},
# When the RedHat version is used, it is likely that this is a version with backports
# and thus has hopefully less known security issues
override_severity=Severity.Low if is_redhat else None,
override_long_description=self.LONG_DESCRIPTION
+ " This might be false-positive since a RedHat version of "
"Keycloak is used that might have received backports."
if is_redhat
else None,
)
23 changes: 11 additions & 12 deletions kcwarden/auditors/realm/realm_email_verification_disabled.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
from kcwarden.api import Auditor
from kcwarden.custom_types.result import Severity
from typing import Generator

from kcwarden.auditors.realm.abstract_realm_auditor import AbstractRealmAuditor
from kcwarden.custom_types.keycloak_object import Realm
from kcwarden.custom_types.result import Severity, Result

class RealmEmailVerificationDisabled(Auditor):

class RealmEmailVerificationDisabled(AbstractRealmAuditor):
DEFAULT_SEVERITY = Severity.Info
SHORT_DESCRIPTION = "Email verification disabled"
LONG_DESCRIPTION = "The realm does not have email verification enabled, meaning that email addresses of users haven't been verified using a double opt-in mechanism. Depending on the source of the addresses, they may not be trustworthy."
REFERENCE = ""

def should_consider_realm(self, realm) -> bool:
return self.is_not_ignored(realm)

def realm_has_email_verification_disabled(self, realm) -> bool:
@staticmethod
def realm_has_email_verification_disabled(realm) -> bool:
return not realm.is_verify_email_enabled()

def audit(self):
for realm in self._DB.get_all_realms():
if self.should_consider_realm(realm):
if self.realm_has_email_verification_disabled(realm):
yield self.generate_finding(realm)
def audit_realm(self, realm: Realm) -> Generator[Result, None, None]:
if self.realm_has_email_verification_disabled(realm):
yield self.generate_finding(realm)
23 changes: 11 additions & 12 deletions kcwarden/auditors/realm/realm_self_registration_enabled.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
from kcwarden.api import Auditor
from kcwarden.custom_types.result import Severity
from typing import Generator

from kcwarden.auditors.realm.abstract_realm_auditor import AbstractRealmAuditor
from kcwarden.custom_types.keycloak_object import Realm
from kcwarden.custom_types.result import Severity, Result

class RealmSelfRegistrationEnabled(Auditor):

class RealmSelfRegistrationEnabled(AbstractRealmAuditor):
DEFAULT_SEVERITY = Severity.Info
SHORT_DESCRIPTION = "Self-Registration enabled"
LONG_DESCRIPTION = "The realm supports self-registration, which means that anyone can register an account. In some cases, this may not be desired, hence kcwarden is flagging this behavior."
REFERENCE = ""

def should_consider_realm(self, realm) -> bool:
return self.is_not_ignored(realm)

def realm_has_self_registration_enabled(self, realm) -> bool:
@staticmethod
def realm_has_self_registration_enabled(realm) -> bool:
return realm.is_self_registration_enabled()

def audit(self):
for realm in self._DB.get_all_realms():
if self.should_consider_realm(realm):
if self.realm_has_self_registration_enabled(realm):
yield self.generate_finding(realm)
def audit_realm(self, realm: Realm) -> Generator[Result, None, None]:
if self.realm_has_self_registration_enabled(realm):
yield self.generate_finding(realm)
25 changes: 12 additions & 13 deletions kcwarden/auditors/realm/refresh_token_reuse_count_should_be_zero.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
from kcwarden.api import Auditor
from kcwarden.custom_types.result import Severity
from typing import Generator

from kcwarden.auditors.realm.abstract_realm_auditor import AbstractRealmAuditor
from kcwarden.custom_types.keycloak_object import Realm
from kcwarden.custom_types.result import Severity, Result

class RefreshTokenReuseCountShouldBeZero(Auditor):

class RefreshTokenReuseCountShouldBeZero(AbstractRealmAuditor):
DEFAULT_SEVERITY = Severity.Medium
SHORT_DESCRIPTION = "Refresh tokens MUST be invalidated after use"
LONG_DESCRIPTION = "Refresh tokens allow a client to obtain a new access token. However, if they get leaked, it may allow an attacker to obtain a long-lived session. Thus, they MUST be rotated after use. In this case, the realm is configured to revoke refresh tokens after a set number of uses, but allows the token to be used more than once. This weakens the security of the setting. (Be advised that at the time of writing, revoking refresh tokens may have undesired results when more than one refresh token can be issued by the same client to the same user, for example in some methods of keeping keys in the frontend. Please consult the following Keycloak issue for more details: https://github.com/keycloak/keycloak/issues/14122)"
REFERENCE = "https://datatracker.ietf.org/doc/html/draft-ietf-oauth-security-topics-23#section-2.2.2"

def should_consider_realm(self, realm) -> bool:
return self.is_not_ignored(realm)

def realm_has_refresh_token_reuse_enabled(self, realm) -> bool:
@staticmethod
def realm_has_refresh_token_reuse_enabled(realm) -> bool:
return realm.has_refresh_token_revocation_enabled() and realm.get_refresh_token_maximum_reuse_count() > 0

def audit(self):
for realm in self._DB.get_all_realms():
# Find realms that have refresh token revocation enabled, but allow a token to be reused more than once
if self.should_consider_realm(realm):
if self.realm_has_refresh_token_reuse_enabled(realm):
yield self.generate_finding(realm)
def audit_realm(self, realm: Realm) -> Generator[Result, None, None]:
# Find realms that have refresh token revocation enabled, but allow a token to be reused more than once
if self.realm_has_refresh_token_reuse_enabled(realm):
yield self.generate_finding(realm)
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
from kcwarden.api import Auditor
from kcwarden.custom_types.result import Severity
from typing import Generator

from kcwarden.auditors.realm.abstract_realm_auditor import AbstractRealmAuditor
from kcwarden.custom_types.keycloak_object import Realm
from kcwarden.custom_types.result import Severity, Result

class RefreshTokensShouldBeRevokedAfterUse(Auditor):

class RefreshTokensShouldBeRevokedAfterUse(AbstractRealmAuditor):
DEFAULT_SEVERITY = Severity.Medium
SHORT_DESCRIPTION = "Refresh tokens MUST be invalidated after use"
LONG_DESCRIPTION = "Refresh tokens allow a client to obtain a new access token. However, if they get leaked, it may allow an attacker to obtain a long-lived session. Thus, they MUST be rotated after use. (Be advised that at the time of writing, revoking refresh tokens may have undesired results when more than one refresh token can be issued by the same client to the same user, for example in some methods of keeping keys in the frontend. Please consult the following Keycloak issue for more details: https://github.com/keycloak/keycloak/issues/14122)"
REFERENCE = "https://datatracker.ietf.org/doc/html/draft-ietf-oauth-security-topics-23#section-2.2.2"

def should_consider_realm(self, realm) -> bool:
return self.is_not_ignored(realm)

def realm_has_refresh_token_revocation_disabled(self, realm) -> bool:
@staticmethod
def realm_has_refresh_token_revocation_disabled(realm) -> bool:
return not realm.has_refresh_token_revocation_enabled()

def audit(self):
for realm in self._DB.get_all_realms():
# Find realms that have refresh token revocation disabled
if self.should_consider_realm(realm):
if self.realm_has_refresh_token_revocation_disabled(realm):
yield self.generate_finding(realm)
def audit_realm(self, realm: Realm) -> Generator[Result, None, None]:
# Find realms that have refresh token revocation disabled
if self.realm_has_refresh_token_revocation_disabled(realm):
yield self.generate_finding(realm)
2 changes: 2 additions & 0 deletions kcwarden/auditors/realm_auditor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from kcwarden.auditors.realm.keycloak_version_should_be_up_to_date import KeycloakVersionShouldBeUpToDate
from kcwarden.auditors.realm.realm_email_verification_disabled import RealmEmailVerificationDisabled
from kcwarden.auditors.realm.realm_self_registration_enabled import RealmSelfRegistrationEnabled
from kcwarden.auditors.realm.refresh_token_reuse_count_should_be_zero import RefreshTokenReuseCountShouldBeZero
Expand All @@ -11,4 +12,5 @@
RefreshTokenReuseCountShouldBeZero,
RealmSelfRegistrationEnabled,
RealmEmailVerificationDisabled,
KeycloakVersionShouldBeUpToDate,
]
15 changes: 8 additions & 7 deletions kcwarden/custom_types/database.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
from collections.abc import Iterable

from kcwarden.custom_types.keycloak_object import (
Client,
Expand Down Expand Up @@ -48,35 +49,35 @@ def add_identity_provider(self, idp: IdentityProvider) -> None:

### Full list getters
@abstractmethod
def get_all_realms(self) -> list[Realm]:
def get_all_realms(self) -> Iterable[Realm]:
raise NotImplementedError()

@abstractmethod
def get_all_clients(self) -> list[Client]:
def get_all_clients(self) -> Iterable[Client]:
raise NotImplementedError()

@abstractmethod
def get_all_scopes(self) -> list[ClientScope]:
def get_all_scopes(self) -> Iterable[ClientScope]:
raise NotImplementedError()

@abstractmethod
def get_all_service_accounts(self) -> list[ServiceAccount]:
def get_all_service_accounts(self) -> Iterable[ServiceAccount]:
raise NotImplementedError()

@abstractmethod
def get_all_groups(self) -> list[Group]:
def get_all_groups(self) -> Iterable[Group]:
raise NotImplementedError()

@abstractmethod
def get_all_realm_roles(self) -> list[RealmRole]:
def get_all_realm_roles(self) -> Iterable[RealmRole]:
raise NotImplementedError()

@abstractmethod
def get_all_client_roles(self) -> dict[str, dict[str, ClientRole]]:
raise NotImplementedError()

@abstractmethod
def get_all_identity_providers(self) -> list[IdentityProvider]:
def get_all_identity_providers(self) -> Iterable[IdentityProvider]:
raise NotImplementedError()

### Specific getters
Expand Down
3 changes: 3 additions & 0 deletions kcwarden/custom_types/keycloak_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ def get_refresh_token_maximum_reuse_count(self) -> int:
def has_declarative_user_profiles_enabled(self) -> bool:
return self._d["attributes"].get("userProfileEnabled", "false") == "true"

def get_keycloak_version(self) -> str:
return self._d["keycloakVersion"]


class RealmRole(Dataclass):
"""
Expand Down
14 changes: 14 additions & 0 deletions kcwarden/utils/github.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import requests

from kcwarden.utils.plugins import logger

GITHUB_API_PATH_LATEST_KEYCLOAK_RELEASE = "https://api.github.com/repos/keycloak/keycloak/releases/latest"


def get_latest_keycloak_version() -> str | None:
try:
response = requests.get(GITHUB_API_PATH_LATEST_KEYCLOAK_RELEASE, timeout=10).json()
return response.get("tag_name", None)
except requests.exceptions.RequestException as e:
logger.warning("Latest Keycloak version cannot be fetched from GitHub due to: %s", e)
return None
69 changes: 69 additions & 0 deletions tests/auditors/realm/test_keycloak_version_should_be_up_to_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from unittest import mock

import pytest

from kcwarden.auditors.realm.keycloak_version_should_be_up_to_date import KeycloakVersionShouldBeUpToDate
from kcwarden.custom_types.keycloak_object import Realm
from kcwarden.custom_types.result import Severity

UUT = "KeycloakVersionShouldBeUpToDate"

KEYCLOAK_VERSION_PATCH_TARGET = (
"kcwarden.auditors.realm.keycloak_version_should_be_up_to_date.get_latest_keycloak_version"
)


class TestKeycloakVersionShouldBeUpToDate:
@pytest.fixture
def auditor(self, mock_database, default_config):
return KeycloakVersionShouldBeUpToDate(mock_database, default_config)

def test_audit__given_the_latest_keycloak_version(self, auditor, mock_realm: Realm):
with mock.patch(KEYCLOAK_VERSION_PATCH_TARGET) as keycloak_version_mock:
keycloak_version_mock.return_value = "99.9.9"
# Setup realm with a fictional version that is the latest one
mock_realm.get_keycloak_version.return_value = "99.9.9"
auditor._DB.get_all_realms.return_value = [mock_realm]

results = list(auditor.audit())
assert len(results) == 0
keycloak_version_mock.assert_called_once()

def test_audit__given_an_outdated_keycloak_version(self, auditor, mock_realm: Realm):
with mock.patch(KEYCLOAK_VERSION_PATCH_TARGET) as keycloak_version_mock:
keycloak_version_mock.return_value = "99.9.9"
# Setup realm with a fictional version that is old
mock_realm.get_keycloak_version.return_value = "20.9.9"
auditor._DB.get_all_realms.return_value = [mock_realm]

results = list(auditor.audit())
assert len(results) == 1
result = results[0]
assert result.get_reporting_auditor() == UUT
assert result.severity == Severity.Medium
keycloak_version_mock.assert_called_once()

def test_audit__given_an_outdated_redhat_keycloak_version(self, auditor, mock_realm: Realm):
with mock.patch(KEYCLOAK_VERSION_PATCH_TARGET) as keycloak_version_mock:
keycloak_version_mock.return_value = "99.9.9"
# Setup realm with a fictional RedHat version
mock_realm.get_keycloak_version.return_value = "20.9.9.redhat-00001"
auditor._DB.get_all_realms.return_value = [mock_realm]

results = list(auditor.audit())
assert len(results) == 1
result = results[0]
assert result.get_reporting_auditor() == "KeycloakVersionShouldBeUpToDate"
assert result.severity == Severity.Low
keycloak_version_mock.assert_called_once()

def test_audit__given_an_undetermined_latest_version(self, auditor, mock_realm: Realm):
with mock.patch(KEYCLOAK_VERSION_PATCH_TARGET) as keycloak_version_mock:
keycloak_version_mock.return_value = None
# Setup realm with a fictional version
mock_realm.get_keycloak_version.return_value = "99.9.9"
auditor._DB.get_all_realms.return_value = [mock_realm]

results = list(auditor.audit())
assert len(results) == 1
keycloak_version_mock.assert_called_once()
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@

class TestRealmEmailVerificationDisabled:
@pytest.fixture
def auditor(self, database, default_config):
auditor_instance = RealmEmailVerificationDisabled(database, default_config)
auditor_instance._DB = Mock()
return auditor_instance
def auditor(self, mock_database, default_config):
return RealmEmailVerificationDisabled(mock_database, default_config)

def test_should_consider_realm(self, mock_realm, auditor):
assert auditor.should_consider_realm(mock_realm) is True # Always consider unless specifically ignored
Expand Down
Loading

0 comments on commit 853b192

Please sign in to comment.