-
I am doing server side session authentication. I have 2 groups of users - "users" / administrators and customers. A person may be an administrator and user same time. So these are 2 different entities. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
My first instinct would be to use a single auth mw, and use roles to authorize routes. Something like this: from typing import Any, Dict, Optional, Set
import msgspec
from typing_extensions import Annotated, TypeAlias
from litestar import Litestar, Request, get, post
from litestar.connection import ASGIConnection
from litestar.dto import DTOData, DTOField, Mark, MsgspecDTO
from litestar.exceptions import NotAuthorizedException, PermissionDeniedException
from litestar.handlers import BaseRouteHandler
from litestar.middleware.session.client_side import ClientSideSessionBackend, CookieBackendConfig
from litestar.security.session_auth import SessionAuth
from litestar.testing import TestClient
from litestar.types import Guard
AnyConnection: TypeAlias = ASGIConnection[Any, Any, Any, Any]
class User(msgspec.Struct):
name: str
roles: Annotated[Set[str], DTOField(mark=Mark.PRIVATE)]
MOCK_DB: Dict[str, User] = {
"only-admin": User(name="only-admin", roles={"admin"}),
"admin-and-customer": User(name="admin-and-customer", roles={"admin", "customer"}),
"only-customer": User(name="only-customer", roles={"customer"}),
}
async def retrieve_user_handler(session: Dict[str, Any], _: AnyConnection) -> Optional[User]:
user_name = session.get("user_name")
return MOCK_DB.get(user_name or "")
@post("/login", dto=MsgspecDTO[User])
async def login(data: DTOData[User], request: Request[Any, Any, Any]) -> User:
user_name = data.as_builtins()["name"]
try:
user = MOCK_DB[user_name]
except KeyError as e:
raise NotAuthorizedException from e
request.set_session({"user_name": user.name, "admin_data": {"hits": 0}, "customer_data": {"hits": 0}})
return user
def create_guard(permissions: Set[str]) -> Guard:
def guard(request: AnyConnection, _: BaseRouteHandler) -> None:
if not request.user or not permissions.issubset(request.user.roles):
raise PermissionDeniedException
return guard
@get("/customer-only", guards=[create_guard({"customer"})], sync_to_thread=False)
def for_customers(request: Request[User, Any, Any]) -> Dict[str, Any]:
session = request.session
session["customer_data"]["hits"] += 1
return {"customer-data-for": request.user.name, "hits": session["customer_data"]["hits"]}
@get("/admin-only", guards=[create_guard({"admin"})], sync_to_thread=False)
def for_admins(request: Request[User, Any, Any]) -> Dict[str, Any]:
session = request.session
session["admin_data"]["hits"] += 1
return {"admin-data-for": request.user.name, "hits": session["admin_data"]["hits"]}
session_auth = SessionAuth[User, ClientSideSessionBackend](
retrieve_user_handler=retrieve_user_handler,
session_backend_config=CookieBackendConfig(secret=b"the-super-secret"),
exclude=["/login"],
)
app = Litestar(
route_handlers=[login, for_admins, for_customers],
on_app_init=[session_auth.on_app_init],
openapi_config=None,
debug=True,
)
def test_no_user() -> None:
with TestClient(app) as client:
response = client.get("/admin-only")
assert response.status_code == 401
response = client.get("/customer-only")
assert response.status_code == 401
def test_admin_only_user() -> None:
with TestClient(app) as client:
response = client.post("/login", json={"name": "only-admin"})
assert response.json() == {"name": "only-admin"}
response = client.get("/admin-only")
assert response.status_code == 200
assert response.json() == {"admin-data-for": "only-admin", "hits": 1}
response = client.get("/customer-only")
assert response.status_code == 403
def test_customer_only_user() -> None:
with TestClient(app) as client:
response = client.post("/login", json={"name": "only-customer"})
assert response.json() == {"name": "only-customer"}
response = client.get("/admin-only")
assert response.status_code == 403
response = client.get("/customer-only")
assert response.status_code == 200
assert response.json() == {"customer-data-for": "only-customer", "hits": 1}
def test_admin_and_customer_user() -> None:
with TestClient(app) as client:
response = client.post("/login", json={"name": "admin-and-customer"})
assert response.json() == {"name": "admin-and-customer"}
response = client.get("/admin-only")
assert response.status_code == 200
assert response.json() == {"admin-data-for": "admin-and-customer", "hits": 1}
response = client.get("/customer-only")
assert response.status_code == 200
assert response.json() == {"customer-data-for": "admin-and-customer", "hits": 1}
By this, I assume that you mean you'd have the admin routes organized in a separate directory from the customer routes. In this case, you could register the guard on a router that represented the tree of routes for that permission group, instead of directly on the handler like in the example. |
Beta Was this translation helpful? Give feedback.
-
I think the following acts as I expect, although I am not sure if there is a better way or if I do the thing "by the book".
|
Beta Was this translation helpful? Give feedback.
I think the following acts as I expect, although I am not sure if there is a better way or if I do the thing "by the book".