-
Notifications
You must be signed in to change notification settings - Fork 4
/
auth_utils.py
62 lines (51 loc) · 2.03 KB
/
auth_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import jwt
from fastapi import HTTPException
from passlib.context import CryptContext
from datetime import datetime, timedelta
from decouple import config
JWT_KEY = config('JWT_KEY')
class AuthJwtCsrf():
pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")
secret_key = JWT_KEY
def generate_hashed_pw(self, password) -> str:
return self.pwd_ctx.hash(password)
def verify_pw(self, plain_pw, hashed_pw) -> bool:
return self.pwd_ctx.verify(plain_pw, hashed_pw)
def encode_jwt(self, email) -> str:
payload = {
'exp': datetime.utcnow() + timedelta(days=0, minutes=5),
'iat': datetime.utcnow(),
'sub': email
}
return jwt.encode(
payload,
self.secret_key,
algorithm='HS256'
)
def decode_jwt(self, token) -> str:
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
return payload['sub']
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=401, detail='The JWT has expired')
except jwt.InvalidTokenError as e:
raise HTTPException(status_code=401, detail='JWT is not valid')
def verify_jwt(self, request) -> str:
token = request.cookies.get("access_token")
if not token:
raise HTTPException(
status_code=401, detail='No JWT exist: may not set yet or deleted')
_, _, value = token.partition(" ")
subject = self.decode_jwt(value)
return subject
def verify_update_jwt(self, request) -> tuple[str, str]:
subject = self.verify_jwt(request)
new_token = self.encode_jwt(subject)
return new_token, subject
def verify_csrf_update_jwt(self, request, csrf_protect, headers) -> str:
csrf_token = csrf_protect.get_csrf_from_headers(headers)
csrf_protect.validate_csrf(csrf_token)
subject = self.verify_jwt(request)
new_token = self.encode_jwt(subject)
return new_token