Skip to content

Commit

Permalink
feat: add queues and messages endpoints (#12)
Browse files Browse the repository at this point in the history
* feat: add queues and messages endpoints

* test: add tests for api

* test: use datetime.utcnow
  • Loading branch information
allisson authored Sep 7, 2022
1 parent f8dd58a commit be80f17
Show file tree
Hide file tree
Showing 9 changed files with 349 additions and 91 deletions.
123 changes: 92 additions & 31 deletions fastqueue/api.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,44 @@
import uvicorn
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi import Depends, FastAPI, Request, status
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session

from fastqueue.config import settings
from fastqueue.database import SessionLocal
from fastqueue.exceptions import AlreadyExistsError, NotFoundError
from fastqueue.schemas import CreateQueueSchema, CreateTopicSchema, ListTopicSchema, QueueSchema, TopicSchema
from fastqueue.services import QueueService, TopicService

app = FastAPI(debug=settings.debug)
from fastqueue.schemas import (
CreateMessageSchema,
CreateQueueSchema,
CreateTopicSchema,
ListMessageSchema,
ListQueueSchema,
ListTopicSchema,
QueueSchema,
TopicSchema,
UpdateQueueSchema,
)
from fastqueue.services import MessageService, QueueService, TopicService

tags_metadata = [
{
"name": "topics",
"description": "Operations with topics.",
},
{
"name": "queues",
"description": "Operations with queues.",
},
{
"name": "messages",
"description": "Operations with messages.",
},
]
app = FastAPI(
title="Fast Queue",
description="Simple queue system based on FastAPI and PostgreSQL.",
debug=settings.debug,
openapi_tags=tags_metadata,
)


def get_session():
Expand All @@ -19,48 +49,79 @@ def get_session():
db.close()


@app.get("/")
def read_root():
return {"Hello": "World"}
@app.exception_handler(AlreadyExistsError)
def already_exists_exception_handler(request: Request, exc: AlreadyExistsError):
return JSONResponse(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content={"detail": exc.args[0]})


@app.exception_handler(NotFoundError)
def not_found_exception_handler(request: Request, exc: NotFoundError):
return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content={"detail": exc.args[0]})


@app.post("/topics", response_model=TopicSchema, status_code=status.HTTP_201_CREATED)
@app.post("/topics", response_model=TopicSchema, status_code=status.HTTP_201_CREATED, tags=["topics"])
def create_topic(data: CreateTopicSchema, session: Session = Depends(get_session)):
try:
return TopicService.create(data=data, session=session)
except AlreadyExistsError as exc:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=exc.args[0])
return TopicService.create(data=data, session=session)


@app.get("/topics/{topic_id}", response_model=TopicSchema, status_code=status.HTTP_200_OK)
@app.get("/topics/{topic_id}", response_model=TopicSchema, status_code=status.HTTP_200_OK, tags=["topics"])
def get_topic(topic_id: str, session: Session = Depends(get_session)):
try:
return TopicService.get(id=topic_id, session=session)
except NotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=exc.args[0])
return TopicService.get(id=topic_id, session=session)


@app.delete("/topics/{topic_id}", status_code=status.HTTP_204_NO_CONTENT)
@app.delete("/topics/{topic_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["topics"])
def delete_topic(topic_id: str, session: Session = Depends(get_session)):
try:
return TopicService.delete(id=topic_id, session=session)
except NotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=exc.args[0])
return TopicService.delete(id=topic_id, session=session)


@app.get("/topics", response_model=ListTopicSchema, status_code=status.HTTP_200_OK)
@app.get("/topics", response_model=ListTopicSchema, status_code=status.HTTP_200_OK, tags=["topics"])
def list_topics(offset: int = 0, limit: int = 10, session: Session = Depends(get_session)):
return TopicService.list(filters=None, offset=offset, limit=limit, session=session)


@app.post("/queues", response_model=QueueSchema, status_code=status.HTTP_201_CREATED)
@app.post("/queues", response_model=QueueSchema, status_code=status.HTTP_201_CREATED, tags=["queues"])
def create_queue(data: CreateQueueSchema, session: Session = Depends(get_session)):
try:
return QueueService.create(data=data, session=session)
except NotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=exc.args[0])
except AlreadyExistsError as exc:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=exc.args[0])
return QueueService.create(data=data, session=session)


@app.get("/queues/{queue_id}", response_model=QueueSchema, status_code=status.HTTP_200_OK, tags=["queues"])
def get_queue(queue_id: str, session: Session = Depends(get_session)):
return QueueService.get(id=queue_id, session=session)


@app.put("/queues/{queue_id}", response_model=QueueSchema, status_code=status.HTTP_200_OK, tags=["queues"])
def update_queue(queue_id: str, data: UpdateQueueSchema, session: Session = Depends(get_session)):
return QueueService.update(id=queue_id, data=data, session=session)


@app.delete("/queues/{queue_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["queues"])
def delete_queue(queue_id: str, session: Session = Depends(get_session)):
return QueueService.delete(id=queue_id, session=session)


@app.get("/queues", response_model=ListQueueSchema, status_code=status.HTTP_200_OK, tags=["queues"])
def list_queues(offset: int = 0, limit: int = 10, session: Session = Depends(get_session)):
return QueueService.list(filters=None, offset=offset, limit=limit, session=session)


@app.post(
"/topics/{topic_id}/messages",
response_model=ListMessageSchema,
status_code=status.HTTP_201_CREATED,
tags=["messages"],
)
def create_message(topic_id: str, data: CreateMessageSchema, session: Session = Depends(get_session)):
return MessageService.create(topic_id=topic_id, data=data, session=session)


@app.get(
"/queues/{queue_id}/messages",
response_model=ListMessageSchema,
status_code=status.HTTP_200_OK,
tags=["messages"],
)
def list_messages_for_consume(queue_id: str, limit: int = 10, session: Session = Depends(get_session)):
return MessageService.list_for_consume(queue_id=queue_id, limit=limit, session=session)


def run_server():
Expand Down
11 changes: 8 additions & 3 deletions fastqueue/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@


class Settings(BaseSettings):
# testing settings
testing: bool = False

# log settings
log_formatter: str = (
"asctime=%(asctime)s level=%(levelname)s pathname=%(pathname)s line=%(lineno)s message=%(message)s"
Expand All @@ -22,6 +19,14 @@ class Settings(BaseSettings):
# postgresql settings
database_url: str

# queue settings
min_ack_deadline_seconds: int = 1
max_ack_deadline_seconds: int = 600
min_message_retention_seconds: int = 600
max_message_retention_seconds: int = 1209600
min_message_max_deliveries: int = 1
max_message_max_deliveries: int = 1000

class Config:
env_file = ".env"
env_file_encoding = "utf-8"
Expand Down
33 changes: 23 additions & 10 deletions fastqueue/schemas.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from datetime import datetime
from typing import Any
from uuid import UUID

from pydantic import BaseModel as Schema
from pydantic import Field

from fastqueue.config import settings

regex_for_id = "^[a-zA-Z0-9-._]+$"


Expand All @@ -27,26 +28,38 @@ class ListTopicSchema(Schema):
class CreateQueueSchema(Schema):
id: str = Field(..., regex=regex_for_id, max_length=128)
topic_id: str | None = Field(None, regex=regex_for_id, max_length=128)
ack_deadline_seconds: int = Field(..., ge=1, le=600)
message_retention_seconds: int = Field(..., ge=600, le=1209600)
message_filters: dict[str, list[Any]] | None = None
message_max_deliveries: int | None = Field(None, ge=1, le=1000)
ack_deadline_seconds: int = Field(
..., ge=settings.min_ack_deadline_seconds, le=settings.max_ack_deadline_seconds
)
message_retention_seconds: int = Field(
..., ge=settings.min_message_retention_seconds, le=settings.max_message_retention_seconds
)
message_filters: dict[str, list[str]] | None = None
message_max_deliveries: int | None = Field(
None, ge=settings.min_message_max_deliveries, le=settings.max_message_max_deliveries
)


class UpdateQueueSchema(Schema):
topic_id: str | None = Field(None, regex=regex_for_id, max_length=128)
ack_deadline_seconds: int = Field(..., ge=1, le=600)
message_retention_seconds: int = Field(..., ge=600, le=1209600)
message_filters: dict[str, list[Any]] | None = None
message_max_deliveries: int | None = Field(None, ge=1, le=1000)
ack_deadline_seconds: int = Field(
..., ge=settings.min_ack_deadline_seconds, le=settings.max_ack_deadline_seconds
)
message_retention_seconds: int = Field(
..., ge=settings.min_message_retention_seconds, le=settings.max_message_retention_seconds
)
message_filters: dict[str, list[str]] | None = None
message_max_deliveries: int | None = Field(
None, ge=settings.min_message_max_deliveries, le=settings.max_message_max_deliveries
)


class QueueSchema(Schema):
id: str
topic_id: str | None
ack_deadline_seconds: int
message_retention_seconds: int
message_filters: dict[str, list[Any]] | None
message_filters: dict[str, list[str]] | None
message_max_deliveries: int | None
created_at: datetime
updated_at: datetime
Expand Down
24 changes: 6 additions & 18 deletions fastqueue/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,13 @@ def create(cls, data: CreateQueueSchema, session: Session) -> QueueSchema:

@classmethod
def update(cls, id: str, data: UpdateQueueSchema, session: Session) -> QueueSchema:
queue = get_model(model=Queue, filters={"id": id}, session=session)
if queue is None:
raise NotFoundError("Queue not found")

if data.topic_id is not None:
TopicService.get(data.topic_id, session=session)

queue = get_model(model=Queue, filters={"id": id}, session=session)
queue.topic_id = data.topic_id
queue.ack_deadline_seconds = data.ack_deadline_seconds
queue.message_retention_seconds = data.message_retention_seconds
Expand All @@ -133,7 +136,7 @@ def update(cls, id: str, data: UpdateQueueSchema, session: Session) -> QueueSche
def get(cls, id: str, session: Session) -> QueueSchema:
queue = get_model(model=Queue, filters={"id": id}, session=session)
if queue is None:
raise NotFoundError("queue not found")
raise NotFoundError("Queue not found")
return QueueSchema.from_orm(queue)

@classmethod
Expand Down Expand Up @@ -174,7 +177,7 @@ def _should_message_be_created_on_queue(

@classmethod
def create(cls, topic_id: str, data: CreateMessageSchema, session: Session) -> ListMessageSchema:
result = ListMessageSchema(data=[], offset=None, limit=None)
result = ListMessageSchema(data=[])
topic = TopicService.get(topic_id, session=session)
queues = QueueService.list(filters={"topic_id": topic.id}, offset=None, limit=None, session=session)
if not queues.data:
Expand Down Expand Up @@ -204,26 +207,11 @@ def create(cls, topic_id: str, data: CreateMessageSchema, session: Session) -> L

@classmethod
def get(cls, id: str, session: Session) -> MessageSchema:
# message = cls.get_model(id=id, session=session)
message = get_model(model=Message, filters={"id": id}, session=session)
if message is None:
raise NotFoundError("message not found")
return MessageSchema.from_orm(message)

@classmethod
def list(
cls, filters: dict | None, offset: int | None, limit: int | None, session: Session
) -> ListMessageSchema:
messages = list_model(
model=Message,
filters=filters,
offset=offset,
limit=limit,
order_by=Message.created_at,
session=session,
)
return ListMessageSchema(data=[MessageSchema.from_orm(message) for message in messages])

@classmethod
def list_for_consume(cls, queue_id: str, limit: int, session: Session) -> ListMessageSchema:
queue = QueueService.get(id=queue_id, session=session)
Expand Down
7 changes: 5 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from fastqueue.api import app
from fastqueue.database import Base, engine, SessionLocal
from fastqueue.models import Message, Queue, Topic
from tests.factories import MessageFactory, QueueFactory, TopicFactory


Expand All @@ -22,11 +23,13 @@ def connection():

@pytest.fixture(scope="function")
def session(connection):
transaction = connection.begin()
session = SessionLocal(bind=connection)
yield session
session.query(Message).delete()
session.query(Queue).delete()
session.query(Topic).delete()
session.commit()
session.close()
transaction.rollback()


@pytest.fixture
Expand Down
6 changes: 1 addition & 5 deletions tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ def expired_at():
return datetime.utcnow() + timedelta(seconds=default_message_retention_seconds)


def scheduled_at():
return datetime.utcnow() + timedelta(seconds=default_ack_deadline_seconds)


class TopicFactory(factory.Factory):
class Meta:
model = Topic
Expand Down Expand Up @@ -44,6 +40,6 @@ class Meta:
data = {"message": "Hello"}
delivery_attempts = 0
expired_at = factory.LazyFunction(expired_at)
scheduled_at = factory.LazyFunction(scheduled_at)
scheduled_at = factory.LazyFunction(datetime.utcnow)
created_at = factory.LazyFunction(datetime.utcnow)
updated_at = factory.LazyFunction(datetime.utcnow)
Loading

0 comments on commit be80f17

Please sign in to comment.