diff --git a/backend/README.md b/backend/README.md index 2c86bdad28..42ae4fd64a 100644 --- a/backend/README.md +++ b/backend/README.md @@ -138,7 +138,7 @@ If you use GitHub Actions the tests will run automatically. If your stack is already up and you just want to run the tests, you can use: ```bash -docker compose exec backend /app/tests-start.sh +docker compose exec backend bash /app/tests-start.sh ``` That `/app/tests-start.sh` script just calls `pytest` after making sure that the rest of the stack is running. If you need to pass extra arguments to `pytest`, you can pass them to that command and they will be forwarded. diff --git a/backend/app/api/deps.py b/backend/app/api/deps.py index b6e05c1776..5bef1169aa 100644 --- a/backend/app/api/deps.py +++ b/backend/app/api/deps.py @@ -3,7 +3,7 @@ from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer -from jose import jwt +from jose import JWTError, jwt from pydantic import ValidationError from sqlmodel import Session @@ -32,7 +32,7 @@ def get_current_user(session: SessionDep, token: TokenDep) -> User: token, settings.SECRET_KEY, algorithms=[security.ALGORITHM] ) token_data = TokenPayload(**payload) - except (jwt.JWTError, ValidationError): + except (JWTError, ValidationError): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Could not validate credentials", diff --git a/backend/app/api/routes/login.py b/backend/app/api/routes/login.py index 30831928f2..8fb65c42e0 100644 --- a/backend/app/api/routes/login.py +++ b/backend/app/api/routes/login.py @@ -12,7 +12,8 @@ from app.models import Message, NewPassword, Token, User, UserOut from app.utils import ( generate_password_reset_token, - send_reset_password_email, + generate_reset_password_email, + send_email, verify_password_reset_token, ) @@ -62,9 +63,14 @@ def recover_password(email: str, session: SessionDep) -> Message: detail="The user with this username does not exist in the system.", ) password_reset_token = generate_password_reset_token(email=email) - send_reset_password_email( + email_data = generate_reset_password_email( email_to=user.email, email=email, token=password_reset_token ) + send_email( + email_to=user.email, + subject=email_data.subject, + html_content=email_data.html_content, + ) return Message(message="Password recovery email sent") diff --git a/backend/app/api/routes/users.py b/backend/app/api/routes/users.py index 0850c929f6..19865908fc 100644 --- a/backend/app/api/routes/users.py +++ b/backend/app/api/routes/users.py @@ -23,7 +23,7 @@ UserUpdate, UserUpdateMe, ) -from app.utils import send_new_account_email +from app.utils import generate_new_account_email, send_email router = APIRouter() @@ -61,9 +61,14 @@ def create_user(*, session: SessionDep, user_in: UserCreate) -> Any: user = crud.create_user(session=session, user_create=user_in) if settings.EMAILS_ENABLED and user_in.email: - send_new_account_email( + email_data = generate_new_account_email( email_to=user_in.email, username=user_in.email, password=user_in.password ) + send_email( + email_to=user_in.email, + subject=email_data.subject, + html_content=email_data.html_content, + ) return user @@ -185,7 +190,9 @@ def delete_user( if not user: raise HTTPException(status_code=404, detail="User not found") - if (user == current_user and not current_user.is_superuser) or (user != current_user and current_user.is_superuser): + if (user == current_user and not current_user.is_superuser) or ( + user != current_user and current_user.is_superuser + ): statement = delete(Item).where(Item.owner_id == user_id) session.exec(statement) session.delete(user) diff --git a/backend/app/api/routes/utils.py b/backend/app/api/routes/utils.py index ee8d105fdf..072f948f6a 100644 --- a/backend/app/api/routes/utils.py +++ b/backend/app/api/routes/utils.py @@ -4,7 +4,7 @@ from app.api.deps import get_current_active_superuser from app.core.celery_app import celery_app from app.models import Message -from app.utils import send_test_email +from app.utils import generate_test_email, send_email router = APIRouter() @@ -31,5 +31,10 @@ def test_email(email_to: EmailStr) -> Message: """ Test emails. """ - send_test_email(email_to=email_to) + email_data = generate_test_email(email_to=email_to) + send_email( + email_to=email_to, + subject=email_data.subject, + html_content=email_data.html_content, + ) return Message(message="Test email sent") diff --git a/backend/app/tests/api/api_v1/test_login.py b/backend/app/tests/api/api_v1/test_login.py index 5415fd83df..ec099eed44 100644 --- a/backend/app/tests/api/api_v1/test_login.py +++ b/backend/app/tests/api/api_v1/test_login.py @@ -39,8 +39,8 @@ def test_use_access_token( def test_recovery_password( client: TestClient, normal_user_token_headers: dict[str, str], mocker ) -> None: - mocker.patch("app.utils.send_reset_password_email", return_value=None) mocker.patch("app.utils.send_email", return_value=None) + mocker.patch("app.core.config.settings.EMAILS_ENABLED", True) email = "test@example.com" r = client.post( f"{settings.API_V1_STR}/password-recovery/{email}", @@ -75,7 +75,7 @@ def test_reset_password( r = client.post( f"{settings.API_V1_STR}/reset-password/", headers=superuser_token_headers, - json=data + json=data, ) assert r.status_code == 200 assert r.json() == {"message": "Password updated successfully"} @@ -88,7 +88,7 @@ def test_reset_password_invalid_token( r = client.post( f"{settings.API_V1_STR}/reset-password/", headers=superuser_token_headers, - json=data + json=data, ) response = r.json() diff --git a/backend/app/tests/api/api_v1/test_users.py b/backend/app/tests/api/api_v1/test_users.py index 69b66579a0..ae2138e578 100644 --- a/backend/app/tests/api/api_v1/test_users.py +++ b/backend/app/tests/api/api_v1/test_users.py @@ -32,7 +32,7 @@ def test_get_users_normal_user_me( def test_create_user_new_email( client: TestClient, superuser_token_headers: dict, db: Session, mocker ) -> None: - mocker.patch("app.utils.send_new_account_email") + mocker.patch("app.utils.send_email") mocker.patch("app.core.config.settings.EMAILS_ENABLED", True) username = random_email() password = random_lower_string() @@ -68,9 +68,7 @@ def test_get_existing_user( assert existing_user.email == api_user["email"] -def test_get_existing_user_current_user( - client: TestClient, db: Session -) -> None: +def test_get_existing_user_current_user(client: TestClient, db: Session) -> None: username = random_email() password = random_lower_string() user_in = UserCreate(email=username, password=password) @@ -184,7 +182,10 @@ def test_update_password_me( client: TestClient, superuser_token_headers: dict, db: Session ) -> None: new_password = random_lower_string() - data = {"current_password": settings.FIRST_SUPERUSER_PASSWORD, "new_password": new_password} + data = { + "current_password": settings.FIRST_SUPERUSER_PASSWORD, + "new_password": new_password, + } r = client.patch( f"{settings.API_V1_STR}/users/me/password", headers=superuser_token_headers, @@ -195,7 +196,10 @@ def test_update_password_me( assert updated_user["message"] == "Password updated successfully" # Revert to the old password to keep consistency in test - old_data = {"current_password": new_password, "new_password": settings.FIRST_SUPERUSER_PASSWORD} + old_data = { + "current_password": new_password, + "new_password": settings.FIRST_SUPERUSER_PASSWORD, + } r = client.patch( f"{settings.API_V1_STR}/users/me/password", headers=superuser_token_headers, @@ -222,7 +226,10 @@ def test_update_password_me_incorrect_password( def test_update_password_me_same_password_error( client: TestClient, superuser_token_headers: dict, db: Session ) -> None: - data = {"current_password": settings.FIRST_SUPERUSER_PASSWORD, "new_password": settings.FIRST_SUPERUSER_PASSWORD} + data = { + "current_password": settings.FIRST_SUPERUSER_PASSWORD, + "new_password": settings.FIRST_SUPERUSER_PASSWORD, + } r = client.patch( f"{settings.API_V1_STR}/users/me/password", headers=superuser_token_headers, @@ -230,12 +237,12 @@ def test_update_password_me_same_password_error( ) assert r.status_code == 400 updated_user = r.json() - assert updated_user["detail"] == "New password cannot be the same as the current one" + assert ( + updated_user["detail"] == "New password cannot be the same as the current one" + ) -def test_create_user_open( - client: TestClient, mocker -) -> None: +def test_create_user_open(client: TestClient, mocker) -> None: mocker.patch("app.core.config.settings.USERS_OPEN_REGISTRATION", True) username = random_email() password = random_lower_string() @@ -251,9 +258,7 @@ def test_create_user_open( assert created_user["full_name"] == full_name -def test_create_user_open_forbidden_error( - client: TestClient, mocker -) -> None: +def test_create_user_open_forbidden_error(client: TestClient, mocker) -> None: mocker.patch("app.core.config.settings.USERS_OPEN_REGISTRATION", False) username = random_email() password = random_lower_string() @@ -267,19 +272,23 @@ def test_create_user_open_forbidden_error( assert r.json()["detail"] == "Open user registration is forbidden on this server" -def test_create_user_open_already_exists_error( - client: TestClient, mocker -) -> None: +def test_create_user_open_already_exists_error(client: TestClient, mocker) -> None: mocker.patch("app.core.config.settings.USERS_OPEN_REGISTRATION", True) password = random_lower_string() full_name = random_lower_string() - data = {"email": settings.FIRST_SUPERUSER, "password": password, "full_name": full_name} + data = { + "email": settings.FIRST_SUPERUSER, + "password": password, + "full_name": full_name, + } r = client.post( f"{settings.API_V1_STR}/users/open", json=data, ) assert r.status_code == 400 - assert r.json()["detail"] == "The user with this username already exists in the system" + assert ( + r.json()["detail"] == "The user with this username already exists in the system" + ) def test_update_user( @@ -311,7 +320,9 @@ def test_update_user_not_exists( json=data, ) assert r.status_code == 404 - assert r.json()["detail"] == "The user with this username does not exist in the system" + assert ( + r.json()["detail"] == "The user with this username does not exist in the system" + ) def test_delete_user_super_user( @@ -331,9 +342,7 @@ def test_delete_user_super_user( assert deleted_user["message"] == "User deleted successfully" -def test_delete_user_current_user( - client: TestClient, db: Session -) -> None: +def test_delete_user_current_user(client: TestClient, db: Session) -> None: username = random_email() password = random_lower_string() user_in = UserCreate(email=username, password=password) diff --git a/backend/app/utils.py b/backend/app/utils.py index 05b75b37c6..ff228cbf14 100644 --- a/backend/app/utils.py +++ b/backend/app/utils.py @@ -1,26 +1,38 @@ import logging +from dataclasses import dataclass from datetime import datetime, timedelta from pathlib import Path from typing import Any import emails -from emails.template import JinjaTemplate -from jose import jwt +from jinja2 import Template +from jose import JWTError, jwt from app.core.config import settings +@dataclass +class EmailData: + html_content: str + subject: str + + +def render_email_template(*, template_name: str, context: dict[str, Any]): + template_str = (Path(settings.EMAIL_TEMPLATES_DIR) / template_name).read_text() + html_content = Template(template_str).render(context) + return html_content + + def send_email( + *, email_to: str, - subject_template: str = "", - html_template: str = "", - environment: dict[str, Any] | None = None, + subject: str = "", + html_content: str = "", ) -> None: - current_environment = environment or {} assert settings.EMAILS_ENABLED, "no provided configuration for email variables" message = emails.Message( - subject=JinjaTemplate(subject_template), - html=JinjaTemplate(html_template), + subject=subject, + html=html_content, mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL), ) smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT} @@ -30,35 +42,28 @@ def send_email( smtp_options["user"] = settings.SMTP_USER if settings.SMTP_PASSWORD: smtp_options["password"] = settings.SMTP_PASSWORD - response = message.send(to=email_to, render=current_environment, smtp=smtp_options) + response = message.send(to=email_to, smtp=smtp_options) logging.info(f"send email result: {response}") -def send_test_email(email_to: str) -> None: +def generate_test_email(email_to: str) -> EmailData: project_name = settings.PROJECT_NAME subject = f"{project_name} - Test email" - with open(Path(settings.EMAIL_TEMPLATES_DIR) / "test_email.html") as f: - template_str = f.read() - send_email( - email_to=email_to, - subject_template=subject, - html_template=template_str, - environment={"project_name": settings.PROJECT_NAME, "email": email_to}, + html_content = render_email_template( + template_name="test_email.html", + context={"project_name": settings.PROJECT_NAME, "email": email_to}, ) + return EmailData(html_content=html_content, subject=subject) -def send_reset_password_email(email_to: str, email: str, token: str) -> None: +def generate_reset_password_email(email_to: str, email: str, token: str) -> EmailData: project_name = settings.PROJECT_NAME subject = f"{project_name} - Password recovery for user {email}" - with open(Path(settings.EMAIL_TEMPLATES_DIR) / "reset_password.html") as f: - template_str = f.read() server_host = settings.SERVER_HOST link = f"{server_host}/reset-password?token={token}" - send_email( - email_to=email_to, - subject_template=subject, - html_template=template_str, - environment={ + html_content = render_email_template( + template_name="reset_password.html", + context={ "project_name": settings.PROJECT_NAME, "username": email, "email": email_to, @@ -66,19 +71,18 @@ def send_reset_password_email(email_to: str, email: str, token: str) -> None: "link": link, }, ) + return EmailData(html_content=html_content, subject=subject) -def send_new_account_email(email_to: str, username: str, password: str) -> None: +def generate_new_account_email( + email_to: str, username: str, password: str +) -> EmailData: project_name = settings.PROJECT_NAME subject = f"{project_name} - New account for user {username}" - with open(Path(settings.EMAIL_TEMPLATES_DIR) / "new_account.html") as f: - template_str = f.read() link = settings.SERVER_HOST - send_email( - email_to=email_to, - subject_template=subject, - html_template=template_str, - environment={ + html_content = render_email_template( + template_name="new_account.html", + context={ "project_name": settings.PROJECT_NAME, "username": username, "password": password, @@ -86,6 +90,7 @@ def send_new_account_email(email_to: str, username: str, password: str) -> None: "link": link, }, ) + return EmailData(html_content=html_content, subject=subject) def generate_password_reset_token(email: str) -> str: @@ -105,5 +110,5 @@ def verify_password_reset_token(token: str) -> str | None: try: decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) return decoded_token["sub"] - except jwt.JWTError: + except JWTError: return None