-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathoauth2.py
132 lines (110 loc) · 4.54 KB
/
oauth2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from pydantic import BaseModel
import jwt
import datetime
from fastapi import HTTPException, status
class User(BaseModel):
username: str
full_name: str | None = None
email: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
class Token(BaseModel):
access_token: str
token_type: str
class OauthDb():
users_db = {
"supervisor": {
"username": "supervisor",
"full_name": "Edgardo Scrimaglia",
"email": "[email protected]",
"hashed_password": "Iaew-2024$",
"roles": ["manager", "developer", "operator"],
"disabled": False,
},
"operator": {
"username": "operator",
"full_name": "System Operator",
"email": "[email protected]",
"hashed_password": "Iaew-2024$",
"roles": ["operator"],
"disabled": False,
}
}
api_registration = {
"/api/v1/pedido": {
"roles": ["manager"]
},
"/api/v1/costo": {
"roles": ["manager"]
},
"/api/v1/producto": {
"roles": ["manager"]
},
"/api/v1/pedidos": {
"roles": ["manager", "operator"]
},
"/api/v1/producer": {
"roles": ["manager"]
}
}
import datetime
import jwt
class Oauth2:
def __init__(self, algorithm: str = "HS256",expires: int = 30) -> None:
self.SECRET_KEY = "588d27e4efad58a4260b8de9d12262467df10316ecf38d6c42d5202909d89c0b"
self.ALGORITHM = algorithm
self.ACCESS_TOKEN_EXPIRE_MINUTES = expires
def raise_credentials_exception(self, status: int, detail: str) -> HTTPException:
raise HTTPException(
status_code=status,
detail=detail,
headers={"WWW-Authenticate": "Bearer"},
)
def authentication(self, username: str, password: str | None = None, db: OauthDb = OauthDb.users_db) -> tuple[dict,str]:
msg = "User Autenticated"
user_db = db.get(username)
if user_db is None or not password in [user_db.get('hashed_password'),None]:
msg = "Invalid username or password"
return user_db, msg
return user_db, msg
def create_access_token(self, data: dict, timezone, expires_delta: datetime.timedelta = 3600) -> str:
to_encode = {}
to_encode.update({"sub": data.get("username")})
to_encode.update({"roles": data.get("roles")})
expire = datetime.datetime.now(timezone) + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, self.SECRET_KEY, algorithm=self.ALGORITHM)
return encoded_jwt
def authorization(self, endpoint: str, token: str) -> None:
def raise_credentials_exception(status: int, detail: str):
raise HTTPException(
status_code=status,
detail=detail,
headers={"WWW-Authenticate": "Bearer"},
)
if not endpoint in OauthDb.api_registration:
self.raise_credentials_exception(status.HTTP_403_FORBIDDEN, "Invalid Authorization")
token_data = self.decode_token(token=token)
user_db, _ = self.authentication(token_data.get("username"))
if user_db is None:
raise_credentials_exception(status.HTTP_401_UNAUTHORIZED, "Invalid username or password")
user = OauthDb.users_db.get(token_data.get("username"))
if not user:
raise_credentials_exception(status.HTTP_401_UNAUTHORIZED, "Could not validate credentials")
roles = token_data.get("roles")
roles_db = OauthDb.api_registration.get(endpoint).get("roles")
if len(roles) == 0 or len(set(roles_db).intersection(set(roles))) == 0:
self.raise_credentials_exception(status.HTTP_403_FORBIDDEN, "Invalid Authorization")
def decode_token(self,token) -> dict:
try:
token_data = {}
payload = jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM])
username = payload.get("sub")
roles = payload.get("roles")
if username is None:
self.raise_credentials_exception(status.HTTP_401_UNAUTHORIZED, "Invalid username or password")
token_data.update({"username":username, "roles": roles})
except (jwt.PyJWTError, jwt.ExpiredSignatureError) as error:
self.raise_credentials_exception(status.HTTP_401_UNAUTHORIZED, "Could not validate credentials")
return token_data