-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.py
202 lines (163 loc) · 7.65 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# FastAPi Educational Proyect
import datetime
import os
import platform
import subprocess
from fastapi import FastAPI, HTTPException, Depends, status, Request
from sqlmodel import SQLModel, create_engine, Session, select
import rabbitmq as rb
from oauth2 import Token, Oauth2
import json
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import pytz
from typing import List
import re
from model import ProductoBase, Pedido, PedidoResponse, PedidoPrecioResponse
from typing import Annotated
# Set Local Time Zone
local_timezone = pytz.timezone('America/Argentina/Buenos_Aires')
SCRIPT_PATH = "./order_service.py"
# Starting FastApi
app = FastAPI(title="IAEW", description="REST Full API TP - Grupo 1 - 2024", version="12.0.0", summary="Use Oauth2 in Postman for Authentication and Authorization")
# Dependencia para el esquema de autenticación
Oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Objetos de SQLite y el ORM
sqlite_file_name = "iaew.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
SQLModel.metadata.create_all(engine)
# Instanciando Class Oauth2
oauth = Oauth2(algorithm="HS256",expires=5)
# API Endpoints
@app.post("/api/v1/pedido", response_model=Pedido, tags=["API Endpoints"])
async def create_pedido(request: Request, pedido: ProductoBase, token: Annotated[str, Depends(Oauth2_scheme)]):
oauth.authorization(request.url.path, token)
def extrae_productos(producto_string: str):
pattern = re.compile(r"Producto\(producto='(.*?)', cantidad=(.*?)\)")
return [
{"producto": match.group(1), "cantidad": float(match.group(2))}
for match in pattern.finditer(producto_string)
]
def create_db_output(db_pedido, productos):
return {
"pedidoId": db_pedido.id,
"userId": db_pedido.userid,
"producto": productos,
"estado": db_pedido.estado,
"creacion": db_pedido.creacion,
"total": db_pedido.total
}
with Session(engine) as session:
db_pedido = Pedido.model_validate(pedido)
productos = extrae_productos(db_pedido.producto)
db_output = create_db_output(db_pedido, productos)
session.add(db_pedido)
session.commit()
session.refresh(db_pedido)
return db_output
@app.post("/api/v1/producer",tags=["RabbitMQ Process"])
async def publish_pedido(request: Request, token: Annotated[str, Depends(Oauth2_scheme)]):
oauth.authorization(request.url.path, token)
try:
msg = json.dumps(rb.for_publishing, indent=2)
result = rb.send_message(msg=msg)
success, response_message = result
if not success:
msg = {"RabbitMQ": response_message}
else:
msg = rb.for_publishing
return msg
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Error al decodificar formato JSON")
except TypeError as err:
raise HTTPException(status_code=422, detail="Error de tipo: " + str(err))
except Exception as err:
raise HTTPException(status_code=500, detail="Error: " + str(err))
@app.get("/api/v1/pedidos", response_model=list[PedidoResponse], tags=["API Endpoints"])
async def read_pedidos(request: Request, token: Annotated[str, Depends(Oauth2_scheme)]):
oauth.authorization(request.url.path, token)
with Session(engine) as session:
registros_pedidos = session.exec(select(Pedido)).all()
def parse_productos(produc: str) -> List[dict]:
pattern = re.compile(r"Producto\(producto='(.*?)', cantidad=(.*?)\)")
return [{"producto": match.group(1), "cantidad": float(match.group(2))}
for match in pattern.finditer(produc)]
db_output = [{
"pedidoId": reg.id,
"userId": reg.userid,
"producto": parse_productos(reg.producto),
"creacion": reg.creacion,
"estado": reg.estado,
"total": reg.total
} for reg in registros_pedidos]
return db_output
@app.get("/api/v1/pedidos/{id}", response_model=Pedido, tags=["API Endpoints"])
async def pedido_by_id(request: Request, id: str, token: Annotated[str, Depends(Oauth2_scheme)]):
base_url = request.url.path.rsplit("/", 1)[0]
oauth.authorization(base_url, token)
with Session(engine) as session:
pedido = session.exec(select(Pedido).where(Pedido.id == id)).one_or_none()
if pedido:
def parse_productos(produc: str) -> List[dict]:
pattern = re.compile(r"Producto\(producto='(.*?)', cantidad=(.*?)\)")
return [{"producto": match.group(1), "cantidad": float(match.group(2))}
for match in pattern.finditer(produc)]
return {
"pedidoId": pedido.id,
"userId": pedido.userid,
"producto": parse_productos(pedido.producto),
"creacion": pedido.creacion,
"total": pedido.total
}
raise HTTPException(status_code=404, detail="El pedido no existe")
@app.post("/api/v1/token", response_model=Token, tags=["API Endpoints"])
async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
user_db, msg = oauth.authentication(form_data.username, form_data.password)
if user_db is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=msg,
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = datetime.timedelta(minutes=oauth.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = oauth.create_access_token(user_db, local_timezone, access_token_expires)
return {"access_token": access_token, "token_type": "Bearer"}
@app.get("/api/v1/costo", response_model=list[PedidoPrecioResponse], tags=["API Endpoints"])
async def read_costo_pedidos(request: Request, token: Annotated[str, Depends(Oauth2_scheme)]):
oauth.authorization(request.url.path, token)
with Session(engine) as session:
registros_pedidos = session.exec(select(Pedido)).all()
def parse_productos(produc: str) -> List[dict]:
pattern = re.compile(r"Producto\(producto='(.*?)', cantidad=(.*?)\)")
return [{"producto": match.group(1), "cantidad": float(match.group(2))}
for match in pattern.finditer(produc)]
db_output = [{
"pedidoId": reg.id,
"userId": reg.userid,
"producto": parse_productos(reg.producto),
"creacion": reg.creacion,
"total": reg.total,
"costo": reg.costo
} for reg in registros_pedidos]
return db_output
@app.post("/api/v1/start-service", tags=["gRPC Process"])
async def start_order_service(request: Request):
if not os.path.isfile(SCRIPT_PATH):
raise HTTPException(status_code=404, detail="El archivo order_service.py no existe.")
try:
command = f"python3 {SCRIPT_PATH}" if platform.system() != "Windows" else f"python {SCRIPT_PATH}"
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate(timeout=1)
if stderr:
return {"output": stdout, "error": stderr}
return {"output": stdout, "message": "Order Service.py ejecutado en segundo plano on port 50051"}
except subprocess.CalledProcessError as e:
raise HTTPException(status_code=400, detail=f"Error al ejecutar Order Service.py: {e.stderr}")
except subprocess.TimeoutExpired:
return {"message": "Order Service.py está en ejecución en segundo plano on port 50051."}