Skip to content

Commit

Permalink
JWTIdentity raises common error JWTIdentityError (#840)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maillol authored Feb 4, 2025
1 parent 2356dc6 commit 23be1ad
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
3 changes: 2 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ select = A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,B901,B902,B903,B950
# W503: Mutually exclusive with W504.
ignore = E226,E501,E722,W503
per-file-ignores =
# B011: assert False used for coverage skipping
# S101: Pytest uses assert
tests/*:S101
tests/*:B011,S101

# flake8-import-order
application-import-names = aiohttp_security
Expand Down
15 changes: 12 additions & 3 deletions aiohttp_security/jwt_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""

from typing import Optional
from typing import Optional, Tuple, Type

from aiohttp import web

Expand All @@ -11,14 +11,23 @@
try:
import jwt
HAS_JWT = True
_bases_error: Tuple[Type[jwt.exceptions.PyJWTError], ...]
_bases_error = (jwt.exceptions.PyJWTError,)
except ImportError: # pragma: no cover
HAS_JWT = False
_bases_error = ()


AUTH_HEADER_NAME = 'Authorization'
AUTH_SCHEME = 'Bearer '


# This class inherits from ValueError to maintain backward compatibility
# with previous versions of aiohttp-security
class InvalidAuthorizationScheme(ValueError, *_bases_error): # type: ignore[misc]
"""Exception when the auth method can't be read from header."""


class JWTIdentityPolicy(AbstractIdentityPolicy):
def __init__(self, secret: str, algorithm: str = "HS256", key: str = "login"):
if not HAS_JWT:
Expand All @@ -34,8 +43,8 @@ async def identify(self, request: web.Request) -> Optional[str]:
return None

if not header_identity.startswith(AUTH_SCHEME):
raise ValueError("Invalid authorization scheme. "
+ "Should be `{}<token>`".format(AUTH_SCHEME))
raise InvalidAuthorizationScheme("Invalid authorization scheme. "
"Should be `{}<token>`".format(AUTH_SCHEME))

token = header_identity.split(' ')[1].strip()

Expand Down
25 changes: 25 additions & 0 deletions tests/test_jwt_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,28 @@ async def check(request):
resp = await client.get('/', headers=headers)
assert 400 == resp.status
assert 'Invalid authorization scheme' in resp.reason


async def test_identify_expired_signature(make_token, aiohttp_client):
kwt_secret_key = "Key" # noqa: S105

token = make_token({"login": "Andrew", "exp": 0}, kwt_secret_key)

async def check(request):
policy = request.app[IDENTITY_KEY]
try:
await policy.identify(request)
except jwt.exceptions.PyJWTError as exc:
raise web.HTTPBadRequest(reason=str(exc))

assert False

app = web.Application()
_setup(app, JWTIdentityPolicy(kwt_secret_key), Autz())
app.router.add_route("GET", "/", check)

client = await aiohttp_client(app)
headers = {"Authorization": "Bearer {}".format(token)}
resp = await client.get("/", headers=headers)
assert 400 == resp.status
assert "Signature has expired" in resp.reason

0 comments on commit 23be1ad

Please sign in to comment.