Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JWTIdentity raises common error JWTIdentityError #840

Merged
merged 8 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions aiohttp_security/jwt_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@
AUTH_SCHEME = 'Bearer '


if HAS_JWT:
# This class inherits from ValueError to maintain backward compatibility
# with previous versions of aiohttp-security.
class JWTIdentityError(jwt.exceptions.PyJWTError, ValueError):
Dreamsorcerer marked this conversation as resolved.
Show resolved Hide resolved
pass

else:
class JWTIdentityError(ValueError):
pass

Check warning on line 30 in aiohttp_security/jwt_identity.py

View check run for this annotation

Codecov / codecov/patch

aiohttp_security/jwt_identity.py#L29-L30

Added lines #L29 - L30 were not covered by tests


class JWTIdentityPolicy(AbstractIdentityPolicy):
def __init__(self, secret: str, algorithm: str = "HS256", key: str = "login"):
if not HAS_JWT:
Expand All @@ -34,14 +45,15 @@
return None

if not header_identity.startswith(AUTH_SCHEME):
raise ValueError("Invalid authorization scheme. "
+ "Should be `{}<token>`".format(AUTH_SCHEME))
raise JWTIdentityError("Invalid authorization scheme. "
+ "Should be `{}<token>`".format(AUTH_SCHEME))

token = header_identity.split(' ')[1].strip()

identity = jwt.decode(token,
self.secret,
algorithms=[self.algorithm])

return identity.get(self.key) # type: ignore[no-any-return]

async def remember(self, request: web.Request, response: web.StreamResponse,
Expand Down
25 changes: 25 additions & 0 deletions tests/test_jwt_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,28 @@
resp = await client.get('/', headers=headers)
assert 400 == resp.status
assert 'Invalid authorization scheme' in resp.reason


async def test_identify_expired_signature(make_token, aiohttp_client):
kwt_secret_key = "Key" # noqa: S105

token = make_token({'login': 'Andrew', 'exp': 0}, kwt_secret_key)

async def check(request):
policy = request.app[IDENTITY_KEY]
try:
await policy.identify(request)
except jwt.exceptions.PyJWTError as exc:
raise web.HTTPBadRequest(reason=str(exc))

return web.Response()

Check warning on line 97 in tests/test_jwt_identity.py

View check run for this annotation

Codecov / codecov/patch

tests/test_jwt_identity.py#L97

Added line #L97 was not covered by tests

app = web.Application()
_setup(app, JWTIdentityPolicy(kwt_secret_key), Autz())
app.router.add_route('GET', '/', check)

client = await aiohttp_client(app)
headers = {"Authorization": "Bearer {}".format(token)}
resp = await client.get('/', headers=headers)
assert 400 == resp.status
assert 'Signature has expired' in resp.reason
Loading