Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

General cleanup for version 6.0.7 #74

Merged
merged 11 commits into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions backend/api/auth/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
__all__ = ["LoginRequest", "LogoutRequest", "SetPasswordRequest"]

from .login_request import LoginRequest
from .logout_request import LogoutRequest
from .set_password_request import SetPasswordRequest
80 changes: 1 addition & 79 deletions backend/api/auth.py → backend/api/auth/login_request.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
import time

from fastapi import Header, Request, Response
from fastapi import Request
from pydantic import Field

import nebula
from server.clientinfo import get_real_ip
from server.dependencies import CurrentUser
from server.models import RequestModel, ResponseModel
from server.request import APIRequest
from server.session import Session
from server.utils import parse_access_token

#
# Models
#


class LoginRequestModel(RequestModel):
Expand All @@ -40,16 +34,6 @@ class LoginResponseModel(ResponseModel):
)


class PasswordRequestModel(RequestModel):
login: str | None = Field(None, title="Login", examples=["admin"])
password: str = Field(..., title="Password", examples=["Password.123"])


#
# Request
#


async def check_failed_login(ip_address: str) -> None:
banned_until = await nebula.redis.get("banned-ip-until", ip_address)
if banned_until is None:
Expand Down Expand Up @@ -121,65 +105,3 @@ async def handle(

session = await Session.create(user, request)
return LoginResponseModel(access_token=session.token)


class LogoutRequest(APIRequest):
"""Log out the current user.

This request will invalidate the access token used in the Authorization header.
"""

name: str = "logout"
title: str = "Logout"

async def handle(self, authorization: str | None = Header(None)) -> None:
if not authorization:
raise nebula.UnauthorizedException("No authorization header provided")

access_token = parse_access_token(authorization)
if not access_token:
raise nebula.UnauthorizedException("Invalid authorization header provided")

await Session.delete(access_token)

raise nebula.UnauthorizedException("Logged out")


class SetPassword(APIRequest):
"""Set a new password for the current (or a given) user.

Normal users can only change their own password.

In order to set a password for another user,
the current user must be an admin, otherwise a 403 error is returned.
"""

name: str = "password"
title: str = "Set password"

async def handle(
self,
request: PasswordRequestModel,
user: CurrentUser,
) -> Response:
if request.login:
if not user.is_admin:
raise nebula.ForbiddenException(
"Only admin can change other user's password"
)
query = "SELECT meta FROM users WHERE login = $1"
async for row in nebula.db.iterate(query, request.login):
target_user = nebula.User.from_row(row)
break
else:
raise nebula.NotFoundException(f"User {request.login} not found")
else:
target_user = user

if len(request.password) < 8:
raise nebula.BadRequestException("Password is too short")

target_user.set_password(request.password)
await target_user.save()

return Response(status_code=204)
28 changes: 28 additions & 0 deletions backend/api/auth/logout_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from fastapi import Header

import nebula
from server.request import APIRequest
from server.session import Session
from server.utils import parse_access_token


class LogoutRequest(APIRequest):
"""Log out the current user.

This request will invalidate the access token used in the Authorization header.
"""

name = "logout"
title = "Logout"

async def handle(self, authorization: str | None = Header(None)) -> None:
if not authorization:
raise nebula.UnauthorizedException("No authorization header provided")

access_token = parse_access_token(authorization)
if not access_token:
raise nebula.UnauthorizedException("Invalid authorization header provided")

await Session.delete(access_token)

raise nebula.UnauthorizedException("Logged out")
52 changes: 52 additions & 0 deletions backend/api/auth/set_password_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from fastapi import Response
from pydantic import Field

import nebula
from server.dependencies import CurrentUser
from server.models import RequestModel
from server.request import APIRequest


class PasswordRequestModel(RequestModel):
login: str | None = Field(None, title="Login", examples=["admin"])
password: str = Field(..., title="Password", examples=["Password.123"])


class SetPasswordRequest(APIRequest):
"""Set a new password for the current (or a given) user.

Normal users can only change their own password.

In order to set a password for another user,
the current user must be an admin, otherwise a 403 error is returned.
"""

name = "password"
title = "Set password"

async def handle(
self,
request: PasswordRequestModel,
user: CurrentUser,
) -> Response:
if request.login:
if not user.is_admin:
raise nebula.ForbiddenException(
"Only admin can change other user's password"
)
query = "SELECT meta FROM users WHERE login = $1"
async for row in nebula.db.iterate(query, request.login):
target_user = nebula.User.from_row(row)
break
else:
raise nebula.NotFoundException(f"User {request.login} not found")
else:
target_user = user

if len(request.password) < 8:
raise nebula.BadRequestException("Password is too short")

target_user.set_password(request.password)
await target_user.save()

return Response(status_code=204)
3 changes: 2 additions & 1 deletion backend/api/browse.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ def build_query(
class Request(APIRequest):
"""Browse the assets database."""

name: str = "browse"
name = "browse"
title = "Browse assets"
response_model = BrowseResponseModel

async def handle(
Expand Down
6 changes: 3 additions & 3 deletions backend/api/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ class DeleteRequestModel(RequestModel):


class Request(APIRequest):
"""Delete object(s)"""
"""Delete one or multiple objects from the database"""

name: str = "delete"
title: str = "Delete objects"
name = "delete"
title = "Delete objects"
responses: list[int] = [204, 401, 403]

async def handle(
Expand Down
8 changes: 3 additions & 5 deletions backend/api/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ def can_access_object(user: nebula.User, meta: dict[str, Any]) -> bool:
if user.is_admin or (user.id in meta.get("assignees", [])):
return True
elif user.is_limited:
if meta.get("created_by") != user.id:
return False
return True
return meta.get("created_by") == user.id
if id_folder := meta.get("id_folder"):
# Users can view assets in folders they have access to
return user.can("asset_view", id_folder)
Expand All @@ -62,8 +60,8 @@ def can_access_object(user: nebula.User, meta: dict[str, Any]) -> bool:
class Request(APIRequest):
"""Get a list of objects"""

name: str = "get"
title: str = "Get objects"
name = "get"
title = "Get objects"
response_model = GetResponseModel

async def handle(
Expand Down
105 changes: 2 additions & 103 deletions backend/api/init/__init__.py
Original file line number Diff line number Diff line change
@@ -1,104 +1,3 @@
from typing import Any, Literal
__all__ = ["InitRequest"]

import fastapi
from pydantic import Field

import nebula
from nebula.plugins.frontend import PluginItemModel, get_frontend_plugins
from nebula.settings import load_settings
from nebula.settings.common import LanguageCode
from server.context import ScopedEndpoint, server_context
from server.dependencies import CurrentUserOptional
from server.models import ResponseModel
from server.request import APIRequest

from .settings import ClientSettingsModel, get_client_settings


class InitResponseModel(ResponseModel):
installed: Literal[True] | None = Field(
True,
title="Installed",
description="Is Nebula installed?",
)

motd: str | None = Field(
None,
title="Message of the day",
description="Server welcome string (displayed on login page)",
)

user: dict[str, Any] | None = Field(
None,
title="User data",
description="User data if user is logged in",
)

settings: ClientSettingsModel | None = Field(
None,
title="Client settings",
)

frontend_plugins: list[PluginItemModel] = Field(
default_factory=list,
title="Frontend plugins",
description="List of plugins available for the web frontend",
)

scoped_endpoints: list[ScopedEndpoint] = Field(default_factory=list)

oauth2_options: list[dict[str, Any]] = Field(
default_factory=list,
title="OAuth2 options",
)


class Request(APIRequest):
"""Initial client request to ensure user is logged in.

If a valid access token is provided, user data is returned,
in the result.

Additionally (regadless the authorization), a message of the day
(motd) and OAuth2 options are returned.
"""

name: str = "init"
title: str = "Login"
response_model = InitResponseModel

async def handle(
self,
request: fastapi.Request,
user: CurrentUserOptional,
) -> InitResponseModel:
default_motd = f"Nebula {nebula.__version__} @ {nebula.config.site_name}"
motd = nebula.config.motd or default_motd

# Nebula is not installed. Frontend should display
# an error message or redirect to the installation page.
if not nebula.settings.installed:
await load_settings()
if not nebula.settings.installed:
return InitResponseModel(installed=False)

# Not logged in. Only return motd and oauth2 options.
if user is None:
return InitResponseModel(motd=motd)

# TODO: get preferred user language
lang: LanguageCode = user.language

# Construct client settings
client_settings = await get_client_settings(lang)
client_settings.server_url = f"{request.url.scheme}://{request.url.netloc}"
plugins = get_frontend_plugins()

return InitResponseModel(
installed=True,
motd=motd,
user=user.meta,
settings=client_settings,
frontend_plugins=plugins,
scoped_endpoints=server_context.scoped_endpoints,
)
from .init_request import InitRequest
File renamed without changes.
Loading