Skip to content
This repository has been archived by the owner on Mar 16, 2024. It is now read-only.

Increase code quality #28

Merged
merged 2 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 48 additions & 41 deletions fastapi_keycloak/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@
from pydantic import BaseModel
from requests import Response

from fastapi_keycloak.exceptions import KeycloakError, MandatoryActionException, UpdateUserLocaleException, ConfigureTOTPException, VerifyEmailException, \
UpdateProfileException, UpdatePasswordException
from fastapi_keycloak.model import HTTPMethod, KeycloakUser, OIDCUser, KeycloakToken, KeycloakRole, KeycloakIdentityProvider, KeycloakGroup
from fastapi_keycloak.exceptions import (
KeycloakError, MandatoryActionException, UpdateUserLocaleException,
ConfigureTOTPException, VerifyEmailException,
UpdateProfileException, UpdatePasswordException)
from fastapi_keycloak.model import (
HTTPMethod, KeycloakUser, OIDCUser, KeycloakToken, KeycloakRole,
KeycloakIdentityProvider, KeycloakGroup
)


def result_or_error(response_model: Type[BaseModel] = None, is_list: bool = False) -> List[BaseModel] or BaseModel or KeycloakError:
Expand Down Expand Up @@ -44,10 +49,7 @@ def inner(f):
def wrapper(*args, **kwargs):

def create_list(json: List[dict]):
items = list()
for entry in json:
items.append(response_model.parse_obj(entry))
return items
return [response_model.parse_obj(entry) for entry in json]

def create_object(json: dict):
return response_model.parse_obj(json)
Expand Down Expand Up @@ -85,8 +87,8 @@ def create_object(json: dict):
class FastAPIKeycloak:
""" Instance to wrap the Keycloak API with FastAPI

Attributes:
_admin_token (KeycloakToken): A KeycloakToken instance, containing the access token that is used for any admin related request
Attributes: _admin_token (KeycloakToken): A KeycloakToken instance, containing the access token that is used for
any admin related request

Example:
```python
Expand Down Expand Up @@ -138,9 +140,8 @@ def admin_token(self):
"""
if self.token_is_valid(token=self._admin_token):
return self._admin_token
else:
self._get_admin_token()
return self.admin_token
self._get_admin_token()
return self.admin_token

@admin_token.setter
def admin_token(self, value: str):
Expand Down Expand Up @@ -185,7 +186,8 @@ def user_auth_scheme(self) -> OAuth2PasswordBearer:
return OAuth2PasswordBearer(tokenUrl=self.token_uri)

def get_current_user(self, required_roles: List[str] = None) -> OIDCUser:
""" Returns the current user based on an access token in the HTTP-header. Optionally verifies roles are possessed by the user
""" Returns the current user based on an access token in the HTTP-header. Optionally verifies roles are possessed
by the user

Args:
required_roles List[str]: List of role names required for this endpoint
Expand Down Expand Up @@ -236,7 +238,8 @@ def open_id_configuration(self) -> dict:
return response.json()

def proxy(self, relative_path: str, method: HTTPMethod, additional_headers: dict = None, payload: dict = None) -> Response:
""" Proxies a request to Keycloak and automatically adds the required Authorization header. Should not be exposed under any circumstances. Grants full API admin access.
""" Proxies a request to Keycloak and automatically adds the required Authorization header. Should not be
exposed under any circumstances. Grants full API admin access.

Args:

Expand Down Expand Up @@ -285,10 +288,17 @@ def _get_admin_token(self) -> None:
response = requests.post(url=self.token_uri, headers=headers, data=data)
try:
self.admin_token = response.json()['access_token']
except JSONDecodeError:
raise KeycloakError(reason=response.content.decode('utf-8'), status_code=response.status_code)
except KeyError:
raise KeycloakError(reason=f"The response did not contain an access_token: {response.json()}", status_code=403)
except JSONDecodeError as e:
raise KeycloakError(
reason=response.content.decode('utf-8'),
status_code=response.status_code,
) from e

except KeyError as e:
raise KeycloakError(
reason=f"The response did not contain an access_token: {response.json()}",
status_code=403,
) from e

@functools.cached_property
def public_key(self) -> str:
Expand Down Expand Up @@ -471,11 +481,10 @@ def get_subgroups(self, group: KeycloakGroup, path: str):
return subgroup
elif subgroup.subGroups:
for subgroup in group.subGroups:
subgroups = self.get_subgroups(subgroup, path)
if subgroups:
if subgroups := self.get_subgroups(subgroup, path):
return subgroups
# Went through the tree without hits
return None
return None

@result_or_error(response_model=KeycloakGroup)
def get_group_by_path(self, path: str, search_in_subgroups=True) -> KeycloakGroup or None:
Expand All @@ -502,8 +511,7 @@ def get_group_by_path(self, path: str, search_in_subgroups=True) -> KeycloakGrou
return group
res = self.get_subgroups(group, path)
if res is not None:
return res
return None
return res

@result_or_error(response_model=KeycloakGroup)
def get_group(self, group_id: str) -> KeycloakGroup or None:
Expand Down Expand Up @@ -669,13 +677,12 @@ def create_user(
"requiredActions": ["VERIFY_EMAIL" if send_email_verification else None]
}
response = self._admin_request(url=self.users_uri, data=data, method=HTTPMethod.POST)
if response.status_code == 201:
user = self.get_user(query=f'username={username}')
if send_email_verification:
self.send_email_verification(user.id)
return user
else:
if response.status_code != 201:
return response
user = self.get_user(query=f'username={username}')
if send_email_verification:
self.send_email_verification(user.id)
return user

@result_or_error()
def change_password(self, user_id: str, new_password: str, temporary: bool = False) -> dict:
Expand Down Expand Up @@ -747,9 +754,8 @@ def update_user(self, user: KeycloakUser):
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)

Notes:
- You may alter any aspect of the user object, also the requiredActions for instance. There is not explicit function for updating those as it is a user update in
essence
Notes: - You may alter any aspect of the user object, also the requiredActions for instance. There is no
explicit function for updating those as it is a user update in essence
"""
response = self._admin_request(url=f'{self.users_uri}/{user.id}', data=user.__dict__, method=HTTPMethod.PUT)
if response.status_code == 204: # Update successful
Expand Down Expand Up @@ -781,8 +787,7 @@ def get_all_users(self) -> List[KeycloakUser]:
Raises:
KeycloakError: If the resulting response is not a successful HTTP-Code (>299)
"""
response = self._admin_request(url=self.users_uri, method=HTTPMethod.GET)
return response
return self._admin_request(url=self.users_uri, method=HTTPMethod.GET)

@result_or_error(response_model=KeycloakIdentityProvider, is_list=True)
def get_identity_providers(self) -> List[KeycloakIdentityProvider]:
Expand All @@ -798,7 +803,8 @@ def get_identity_providers(self) -> List[KeycloakIdentityProvider]:

@result_or_error(response_model=KeycloakToken)
def user_login(self, username: str, password: str) -> KeycloakToken:
""" Models the password OAuth2 flow. Exchanges username and password for an access token. Will raise detailed errors if login fails due to requiredActions
""" Models the password OAuth2 flow. Exchanges username and password for an access token. Will raise detailed
errors if login fails due to requiredActions

Args:
username (str): Username used for login
Expand Down Expand Up @@ -846,15 +852,17 @@ def user_login(self, username: str, password: str) -> KeycloakToken:
}.get(
reason, # Try to return the matching exception
# On custom or unknown actions return a MandatoryActionException by default
MandatoryActionException(detail=f"This user can't login until the following action has been resolved: {reason}")
MandatoryActionException(detail=f"This user can't login until the following action has been "
f"resolved: {reason}")
)
raise exception
return response

@result_or_error(response_model=KeycloakToken)
def exchange_authorization_code(self, session_state: str, code: str) -> KeycloakToken:
""" Models the authorization code OAuth2 flow. Opening the URL provided by `login_uri` will result in a callback to the configured callback URL.
The callback will also create a session_state and code query parameter that can be exchanged for an access token.
""" Models the authorization code OAuth2 flow. Opening the URL provided by `login_uri` will result in a
callback to the configured callback URL. The callback will also create a session_state and code query
parameter that can be exchanged for an access token.

Args:
session_state (str): Salt to reduce the risk of successful attacks
Expand All @@ -877,8 +885,7 @@ def exchange_authorization_code(self, session_state: str, code: str) -> Keycloak
"grant_type": "authorization_code",
"redirect_uri": self.callback_uri
}
response = requests.post(url=self.token_uri, headers=headers, data=data)
return response
return requests.post(url=self.token_uri, headers=headers, data=data)

def _admin_request(self, url: str, method: HTTPMethod, data: dict = None, content_type: str = "application/json") -> Response:
""" Private method that is the basis for any requests requiring admin access to the api. Will append the necessary `Authorization` header
Expand Down Expand Up @@ -999,7 +1006,7 @@ def _decode_token(self, token: str, options: dict = None, audience: str = None)

def __str__(self):
""" String representation """
return f'FastAPI Keycloak Integration'
return 'FastAPI Keycloak Integration'

def __repr__(self):
""" Debug representation """
Expand Down
25 changes: 15 additions & 10 deletions fastapi_keycloak/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,35 @@ def __init__(self, detail: str) -> None:


class UpdateUserLocaleException(MandatoryActionException):
""" Throw if the exchange of username and password for an access token fails due to the update_user_locale requiredAction"""
""" Throw if the exchange of username and password for an access token fails due to the update_user_locale
requiredAction """
def __init__(self) -> None:
super().__init__(detail=f"This user can't login until he updated his locale")
super().__init__(detail="This user can't login until he updated his locale")


class ConfigureTOTPException(MandatoryActionException):
""" Throw if the exchange of username and password for an access token fails due to the CONFIGURE_TOTP requiredAction"""
""" Throw if the exchange of username and password for an access token fails due to the CONFIGURE_TOTP
requiredAction """
def __init__(self) -> None:
super().__init__(detail=f"This user can't login until he configured TOTP")
super().__init__(detail="This user can't login until he configured TOTP")


class VerifyEmailException(MandatoryActionException):
""" Throw if the exchange of username and password for an access token fails due to the VERIFY_EMAIL requiredAction"""
""" Throw if the exchange of username and password for an access token fails due to the VERIFY_EMAIL
requiredAction """
def __init__(self) -> None:
super().__init__(detail=f"This user can't login until he verified his email")
super().__init__(detail="This user can't login until he verified his email")


class UpdatePasswordException(MandatoryActionException):
""" Throw if the exchange of username and password for an access token fails due to the UPDATE_PASSWORD requiredAction"""
""" Throw if the exchange of username and password for an access token fails due to the UPDATE_PASSWORD
requiredAction """
def __init__(self) -> None:
super().__init__(detail=f"This user can't login until he updated his password")
super().__init__(detail="This user can't login until he updated his password")


class UpdateProfileException(MandatoryActionException):
""" Throw if the exchange of username and password for an access token fails due to the UPDATE_PROFILE requiredAction"""
""" Throw if the exchange of username and password for an access token fails due to the UPDATE_PROFILE
requiredAction """
def __init__(self) -> None:
super().__init__(detail=f"This user can't login until he updated his profile")
super().__init__(detail="This user can't login until he updated his profile")
25 changes: 14 additions & 11 deletions fastapi_keycloak/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class KeycloakUser(BaseModel):
access (dict):
attributes (Optional[dict]):

Notes:
Check the Keycloak documentation at https://www.keycloak.org/docs-api/15.0/rest-api/index.html for details. This is a mere proxy object.
Notes: Check the Keycloak documentation at https://www.keycloak.org/docs-api/15.0/rest-api/index.html for
details. This is a mere proxy object.
"""
id: str
createdTimestamp: int
Expand Down Expand Up @@ -71,7 +71,7 @@ class UsernamePassword(BaseModel):


class OIDCUser(BaseModel):
""" Represents a user object of Keycloak, parsed from an access token
""" Represents a user object of Keycloak, parsed from access token

Attributes:
sub (str):
Expand All @@ -86,8 +86,8 @@ class OIDCUser(BaseModel):
preferred_username (Optional[str]):
realm_access (dict):

Notes:
Check the Keycloak documentation at https://www.keycloak.org/docs-api/15.0/rest-api/index.html for details. This is a mere proxy object.
Notes: Check the Keycloak documentation at https://www.keycloak.org/docs-api/15.0/rest-api/index.html for
details. This is a mere proxy object.
"""
sub: str
iat: int
Expand All @@ -110,8 +110,11 @@ def roles(self) -> List[str]:
"""
try:
return self.realm_access['roles']
except KeyError:
raise KeycloakError(status_code=404, reason="The 'realm_access' section of the provided access token did not contain any 'roles'")
except KeyError as e:
raise KeycloakError(
status_code=404,
reason="The 'realm_access' section of the provided access token did not contain any 'roles'",
) from e

def __str__(self) -> str:
""" String representation of an OIDCUser """
Expand All @@ -135,8 +138,8 @@ class KeycloakIdentityProvider(BaseModel):
firstBrokerLoginFlowAlias (str):
config (dict):

Notes:
Check the Keycloak documentation at https://www.keycloak.org/docs-api/15.0/rest-api/index.html for details. This is a mere proxy object.
Notes: Check the Keycloak documentation at https://www.keycloak.org/docs-api/15.0/rest-api/index.html for
details. This is a mere proxy object.
"""
alias: str
internalId: str
Expand All @@ -162,8 +165,8 @@ class KeycloakRole(BaseModel):
clientRole (bool):
containerId (str):

Notes:
Check the Keycloak documentation at https://www.keycloak.org/docs-api/15.0/rest-api/index.html for details. This is a mere proxy object.
Notes: Check the Keycloak documentation at https://www.keycloak.org/docs-api/15.0/rest-api/index.html for
details. This is a mere proxy object.
"""
id: str
name: str
Expand Down
10 changes: 7 additions & 3 deletions tests/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from fastapi import FastAPI, Depends, Query, Body
from pydantic import SecretStr

from fastapi_keycloak import FastAPIKeycloak, OIDCUser, UsernamePassword, HTTPMethod, KeycloakUser, KeycloakGroup
from fastapi_keycloak import FastAPIKeycloak, OIDCUser, UsernamePassword, HTTPMethod, KeycloakUser

app = FastAPI()
idp = FastAPIKeycloak(
Expand All @@ -22,7 +22,10 @@
# Admin

@app.post("/proxy", tags=["admin-cli"])
def proxy_admin_request(relative_path: str, method: HTTPMethod, additional_headers: dict = Body(None), payload: dict = Body(None)):
def proxy_admin_request(
relative_path: str, method: HTTPMethod, additional_headers: dict = Body(None),
payload: dict = Body(None)
):
return idp.proxy(
additional_headers=additional_headers,
relative_path=relative_path,
Expand Down Expand Up @@ -55,7 +58,8 @@ def get_user_by_query(query: str = None):

@app.post("/users", tags=["user-management"])
def create_user(first_name: str, last_name: str, email: str, password: SecretStr, id: str = None):
return idp.create_user(first_name=first_name, last_name=last_name, username=email, email=email, password=password.get_secret_value(), id=id)
return idp.create_user(first_name=first_name, last_name=last_name, username=email, email=email,
password=password.get_secret_value(), id=id)


@app.get("/user/{user_id}", tags=["user-management"])
Expand Down
6 changes: 1 addition & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
import subprocess
from time import sleep


def pytest_sessionstart(session):
# subprocess.call(['sh', './start_infra.sh'])
# print("Waiting for Keycloak to start")
Expand All @@ -11,4 +7,4 @@ def pytest_sessionstart(session):

def pytest_sessionfinish(session):
# subprocess.call(['sh', './stop_infra.sh'])
pass
pass
Loading