diff --git a/.prospector.yaml b/.prospector.yaml index 970d3d7..33e0489 100644 --- a/.prospector.yaml +++ b/.prospector.yaml @@ -43,6 +43,8 @@ pylint: - assignment-from-none - redefined-outer-name - no-self-argument + - unnecessary-pass + - logging-fstring-interpolation options: max-locals: 25 max-line-length: 120 diff --git a/py_ocpi/core/authentication/authenticator.py b/py_ocpi/core/authentication/authenticator.py index 8e4f19e..422b86c 100644 --- a/py_ocpi/core/authentication/authenticator.py +++ b/py_ocpi/core/authentication/authenticator.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod -from typing import List +from typing import List, Union from py_ocpi.core.exceptions import AuthorizationOCPIError from py_ocpi.core.config import logger @@ -18,26 +18,26 @@ async def authenticate(cls, auth_token: str) -> None: """ list_token_c = await cls.get_valid_token_c() if auth_token not in list_token_c: - logger.debug("Given `%s` token is not valid" % auth_token) + logger.debug(f"Given `{auth_token}` token is not valid") raise AuthorizationOCPIError @classmethod async def authenticate_credentials( cls, auth_token: str, - ) -> str | dict | None: + ) -> Union[str, dict, None]: """Authenticate given auth token where both tokens valid.""" if auth_token: list_token_a = await cls.get_valid_token_a() if auth_token in list_token_a: - logger.debug("Token A `%s` is used." % auth_token) + logger.debug(f"Token A `{auth_token}` is used.") return {} list_token_c = await cls.get_valid_token_c() if auth_token in list_token_c: - logger.debug("Token C `%s` is used." % auth_token) + logger.debug(f"Token C `{auth_token}` is used.") return auth_token - logger.debug("Token `%s` is not of type A or C." % auth_token) + logger.debug(f"Token `{auth_token}` is not of type A or C.") return None @classmethod diff --git a/py_ocpi/core/authentication/verifier.py b/py_ocpi/core/authentication/verifier.py index 9878a8c..2dc5301 100644 --- a/py_ocpi/core/authentication/verifier.py +++ b/py_ocpi/core/authentication/verifier.py @@ -1,3 +1,5 @@ +from typing import Union + from fastapi import ( Depends, Header, @@ -61,19 +63,19 @@ async def __call__( if self.version.startswith("2.2"): try: token = decode_string_base64(token) - except UnicodeDecodeError: + except UnicodeDecodeError as exc: logger.debug( - "Token `%s` cannot be decoded. " - "Check if the token is already encoded." % token + f"Token `{token}` cannot be decoded. " + "Check if the token is already encoded." ) - raise AuthorizationOCPIError + raise AuthorizationOCPIError from exc await authenticator.authenticate(token) - except IndexError: + except IndexError as exc: logger.debug( "Token `%s` cannot be split in parts. " "Check if it starts with `Token `" ) - raise AuthorizationOCPIError + raise AuthorizationOCPIError from exc class CredentialsAuthorizationVerifier: @@ -84,14 +86,14 @@ class CredentialsAuthorizationVerifier: :param version (VersionNumber): OCPI version used. """ - def __init__(self, version: VersionNumber | None) -> None: + def __init__(self, version: Union[VersionNumber, None]) -> None: self.version = version async def __call__( self, authorization: str = Security(api_key_header), authenticator: Authenticator = Depends(get_authenticator), - ) -> str | dict | None: + ) -> Union[str, dict, None]: """ Verifies the authorization token using the specified version and an Authenticator. @@ -106,23 +108,20 @@ async def __call__( """ try: token = authorization.split()[1] - except IndexError: - logger.debug( - "Token `%s` cannot be split in parts. " - "Check if it starts with `Token `" - ) - raise AuthorizationOCPIError + except IndexError as exc: + logger.debug("Token cannot be split in parts. Check if it starts with `Token `") + raise AuthorizationOCPIError from exc if self.version: if self.version.startswith("2.2"): try: token = decode_string_base64(token) - except UnicodeDecodeError: + except UnicodeDecodeError as exc: logger.debug( - "Token `%s` cannot be decoded. " - "Check if the token is already encoded." % token + f"Token `{token}` cannot be decoded. " + "Check if the token is already encoded." ) - raise AuthorizationOCPIError + raise AuthorizationOCPIError from exc else: try: token = decode_string_base64(token) @@ -141,7 +140,7 @@ async def __call__( self, authorization: str = auth_verifier, authenticator: Authenticator = Depends(get_authenticator), - ) -> str | dict | None: + ) -> Union[str, dict, None]: """ Verifies the authorization token using the specified version and an Authenticator for version endpoints. @@ -194,19 +193,19 @@ async def __call__( if version.value.startswith("2.2"): try: token = decode_string_base64(token) - except UnicodeDecodeError: + except UnicodeDecodeError as exc: logger.debug( - "Token `%s` cannot be decoded. " - "Check if the token is already encoded." % token + f"Token `{token}` cannot be decoded. " + "Check if the token is already encoded." ) - raise AuthorizationOCPIError + raise AuthorizationOCPIError from exc await authenticator.authenticate(token) - except IndexError: + except IndexError as exc: logger.debug( - "Token `%s` cannot be split in parts. " + "Token cannot be split in parts. " "Check if it starts with `Token `" ) - raise AuthorizationOCPIError + raise AuthorizationOCPIError from exc class WSPushVerifier: @@ -233,7 +232,7 @@ async def __call__( :raises AuthorizationOCPIError: If there is an issue with the authorization token. """ - if settings.NO_AUTH and token == "": + if settings.NO_AUTH and token == "": # nosec logger.debug("Authentication skipped due to NO_AUTH setting.") return True @@ -245,12 +244,12 @@ async def __call__( if version.value.startswith("2.2"): try: token = decode_string_base64(token) - except UnicodeDecodeError: + except UnicodeDecodeError as exc: logger.debug( - "Token `%s` cannot be decoded. " - "Check if the token is already encoded." % token + f"Token `{token}` cannot be decoded. " + "Check if the token is already encoded." ) - raise AuthorizationOCPIError + raise AuthorizationOCPIError from exc await authenticator.authenticate(token) - except AuthorizationOCPIError: - raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION) + except AuthorizationOCPIError as exc: + raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION) from exc diff --git a/py_ocpi/core/push.py b/py_ocpi/core/push.py index f7ab3e9..f9a4383 100644 --- a/py_ocpi/core/push.py +++ b/py_ocpi/core/push.py @@ -104,16 +104,15 @@ async def push_object( async with httpx.AsyncClient() as client: logger.info( - "Send request to get version details: %s" - % receiver.endpoints_url + f"Send request to get version details: {receiver.endpoints_url}" ) response = await client.get( receiver.endpoints_url, headers={"authorization": client_auth_token}, ) - logger.info("Response status_code - `%s`" % response.status_code) + logger.info(f"Response status_code - `{response.status_code}`") endpoints = response.json()["data"]["endpoints"] - logger.debug("Endpoints response data - `%s`" % endpoints) + logger.debug(f"Endpoints response data - `{endpoints}`") # get object data if push.module_id == ModuleID.tokens: @@ -126,7 +125,7 @@ async def push_object( version=version, ) else: - logger.debug("Requested module with push is `%s`." % push.module_id) + logger.debug(f"Requested module with push is `{push.module_id}`.") data = await crud.get( push.module_id, RoleEnum.cpo, @@ -162,7 +161,7 @@ async def push_object( ) ) result = PushResponse(receiver_responses=receiver_responses) - logger.debug("Result of push operation - %s" % result.dict()) + logger.debug(f"Result of push operation - {result.dict()}") return result @@ -186,7 +185,7 @@ async def http_push_to_client( adapter: Adapter = Depends(get_adapter), ): logger.info("Received push http request.") - logger.debug("Received push data - `%s`" % push.dict()) + logger.debug(f"Received push data - `{push.dict()}`") auth_token = get_auth_token(request, version) return await push_object(version, push, crud, adapter, auth_token) @@ -210,10 +209,10 @@ async def websocket_push_to_client( while True: data = await websocket.receive_json() - logger.debug("Received data through ws - `%s`" % data) + logger.debug(f"Received data through ws - `{data}`") push = Push(**data) push_response = await push_object( version, push, crud, adapter, auth_token ) - logger.debug("Sending push response - `%s`" % push_response.dict()) + logger.debug(f"Sending push response - `{push_response.dict()}`") await websocket.send_json(push_response.dict()) diff --git a/py_ocpi/core/utils.py b/py_ocpi/core/utils.py index d06e464..940f10e 100644 --- a/py_ocpi/core/utils.py +++ b/py_ocpi/core/utils.py @@ -87,7 +87,7 @@ def get_module_model(class_name, module_name: str, version_name: str) -> Any: try: module = importlib.import_module(module_dir) return getattr(module, class_name) - except ImportError: + except ImportError as exc: raise NotImplementedError( f"{class_name} schema for version {version_name} not found.", - ) + ) from exc diff --git a/py_ocpi/main.py b/py_ocpi/main.py index c19a4a0..55793da 100644 --- a/py_ocpi/main.py +++ b/py_ocpi/main.py @@ -10,7 +10,7 @@ ) from py_ocpi.core.endpoints import ENDPOINTS -from py_ocpi.modules.versions import router as versions_router +from py_ocpi.modules.versions.main import router as versions_router from py_ocpi.modules.versions.enums import VersionNumber from py_ocpi.modules.versions.schemas import Version from py_ocpi.core.dependencies import ( @@ -39,8 +39,8 @@ class ExceptionHandlerMiddleware(BaseHTTPMiddleware): async def dispatch( self, request: Request, call_next: RequestResponseEndpoint ): - logger.debug("%s: %s" % (request.method, request.url)) - logger.debug("Request headers - %s" % request.headers) + logger.debug(f"{request.method}: {request.url}") + logger.debug(f"Request headers - {request.headers}" ) try: response = await call_next(request) @@ -64,7 +64,7 @@ async def dispatch( **status.OCPI_3000_GENERIC_SERVER_ERROR, ).dict() ) - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught logger.warning(f"Unknown exception: {str(e)}.") response = JSONResponse( OCPIResponse( @@ -77,7 +77,7 @@ async def dispatch( return response -def get_application( +def get_application( # noqa: MC0001 version_numbers: List[VersionNumber], roles: List[RoleEnum], crud: Any, diff --git a/py_ocpi/modules/versions/main.py b/py_ocpi/modules/versions/main.py index d618101..3a88a81 100644 --- a/py_ocpi/modules/versions/main.py +++ b/py_ocpi/modules/versions/main.py @@ -28,7 +28,7 @@ async def get_versions( request: Request, versions=Depends(get_versions_), crud: Crud = Depends(get_crud), - server_cred: str | dict | None = Depends(cred_dependency), + server_cred: Unione[str, dict, None] = Depends(cred_dependency), ): """ Get OCPI Versions. @@ -38,7 +38,7 @@ async def get_versions( **Returns:** The OCPIResponse containing a list of available OCPI versions. """ - logger.info("Received request for version details: %s" % request.url) + logger.info(f"Received request for version details: {request.url}") if server_cred is None: logger.debug("Unauthorized request.") raise HTTPException(