Skip to content

Commit

Permalink
feat: add topics endpoints (#11)
Browse files Browse the repository at this point in the history
* feat: add topics endpoints

* test: update python version
  • Loading branch information
allisson authored Sep 7, 2022
1 parent 901bd1b commit f8dd58a
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 153 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/lint-and-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ jobs:
with:
path: ~/.cache
key: self-runner-${{ runner.os }}-python-3.10-poetry-${{ hashFiles('poetry.lock') }}-precommit-${{ hashFiles('.pre-commit-config.yaml') }}
- name: Set up Python 3.10.6
- name: Set up Python 3.10.7
uses: actions/setup-python@v4
with:
python-version: 3.10.6
python-version: 3.10.7
- name: Install dependencies
run: |
cp env.sample .env
Expand Down
6 changes: 3 additions & 3 deletions alembic/versions/001_initial.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Auto generated
Revision ID: 2185874053e3
Revision ID: b290fd728c17
Revises:
Create Date: 2022-09-06 16:17:25.031314
Create Date: 2022-09-06 21:44:14.941396
"""
import sqlalchemy as sa
Expand All @@ -11,7 +11,7 @@
from alembic import op

# revision identifiers, used by Alembic.
revision = "2185874053e3"
revision = "b290fd728c17"
down_revision = None
branch_labels = None
depends_on = None
Expand Down
54 changes: 53 additions & 1 deletion fastqueue/api.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,68 @@
import uvicorn
from fastapi import FastAPI
from fastapi import Depends, FastAPI, HTTPException, status
from sqlalchemy.orm import Session

from fastqueue.config import settings
from fastqueue.database import SessionLocal
from fastqueue.exceptions import AlreadyExistsError, NotFoundError
from fastqueue.schemas import CreateQueueSchema, CreateTopicSchema, ListTopicSchema, QueueSchema, TopicSchema
from fastqueue.services import QueueService, TopicService

app = FastAPI(debug=settings.debug)


def get_session():
db = SessionLocal()
try:
yield db
finally:
db.close()


@app.get("/")
def read_root():
return {"Hello": "World"}


@app.post("/topics", response_model=TopicSchema, status_code=status.HTTP_201_CREATED)
def create_topic(data: CreateTopicSchema, session: Session = Depends(get_session)):
try:
return TopicService.create(data=data, session=session)
except AlreadyExistsError as exc:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=exc.args[0])


@app.get("/topics/{topic_id}", response_model=TopicSchema, status_code=status.HTTP_200_OK)
def get_topic(topic_id: str, session: Session = Depends(get_session)):
try:
return TopicService.get(id=topic_id, session=session)
except NotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=exc.args[0])


@app.delete("/topics/{topic_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_topic(topic_id: str, session: Session = Depends(get_session)):
try:
return TopicService.delete(id=topic_id, session=session)
except NotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=exc.args[0])


@app.get("/topics", response_model=ListTopicSchema, status_code=status.HTTP_200_OK)
def list_topics(offset: int = 0, limit: int = 10, session: Session = Depends(get_session)):
return TopicService.list(filters=None, offset=offset, limit=limit, session=session)


@app.post("/queues", response_model=QueueSchema, status_code=status.HTTP_201_CREATED)
def create_queue(data: CreateQueueSchema, session: Session = Depends(get_session)):
try:
return QueueService.create(data=data, session=session)
except NotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=exc.args[0])
except AlreadyExistsError as exc:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=exc.args[0])


def run_server():
uvicorn.run(
"fastqueue.api:app",
Expand Down
4 changes: 4 additions & 0 deletions fastqueue/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
class NotFoundError(Exception):
pass


class AlreadyExistsError(Exception):
pass
11 changes: 3 additions & 8 deletions fastqueue/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ class ListTopicSchema(Schema):
class CreateQueueSchema(Schema):
id: str = Field(..., regex=regex_for_id, max_length=128)
topic_id: str | None = Field(None, regex=regex_for_id, max_length=128)
ack_deadline_seconds: int = Field(30, ge=0, le=600)
message_retention_seconds: int = Field(1209600, ge=600, le=1209600)
ack_deadline_seconds: int = Field(..., ge=1, le=600)
message_retention_seconds: int = Field(..., ge=600, le=1209600)
message_filters: dict[str, list[Any]] | None = None
message_max_deliveries: int | None = Field(None, ge=1, le=1000)


class UpdateQueueSchema(Schema):
topic_id: str | None = Field(None, regex=regex_for_id, max_length=128)
ack_deadline_seconds: int = Field(..., ge=0, le=600)
ack_deadline_seconds: int = Field(..., ge=1, le=600)
message_retention_seconds: int = Field(..., ge=600, le=1209600)
message_filters: dict[str, list[Any]] | None = None
message_max_deliveries: int | None = Field(None, ge=1, le=1000)
Expand Down Expand Up @@ -64,11 +64,6 @@ class CreateMessageSchema(Schema):
attributes: dict | None = None


class UpdateMessageSchema(Schema):
delivery_attempts: int
scheduled_at: datetime


class MessageSchema(Schema):
id: UUID
queue_id: str
Expand Down
99 changes: 59 additions & 40 deletions fastqueue/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from datetime import datetime, timedelta
from typing import Any

from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session

from fastqueue.exceptions import NotFoundError
from fastqueue.exceptions import AlreadyExistsError, NotFoundError
from fastqueue.models import Message, Queue, Topic
from fastqueue.schemas import (
CreateMessageSchema,
Expand All @@ -16,51 +17,74 @@
MessageSchema,
QueueSchema,
TopicSchema,
UpdateMessageSchema,
UpdateQueueSchema,
)


def apply_basic_filters(query: Any, filters: dict | None, offset: int | None, limit: int | None) -> Any:
def apply_basic_filters(
query: Any, filters: dict | None, offset: int | None, limit: int | None, order_by: Any | None = None
) -> Any:
if filters is not None:
query = query.filter_by(**filters)
if order_by is not None:
query = query.order_by(order_by)
if offset is not None:
query = query.offset(offset)
if limit is not None:
query = query.limit(limit)
return query


def list_model(
model: Any,
filters: dict | None,
offset: int | None,
limit: int | None,
order_by: Any | None,
session: Session,
) -> Any:
query = session.query(model)
query = apply_basic_filters(query=query, filters=filters, offset=offset, limit=limit, order_by=order_by)
return query.all()


def get_model(model: Any, filters: dict | None, session: Session) -> Any:
query = session.query(model)
query = apply_basic_filters(query=query, filters=filters, offset=None, limit=None)
return query.first()


class TopicService:
@classmethod
def create(cls, data: CreateTopicSchema, session: Session) -> TopicSchema:
topic = Topic(id=data.id, created_at=datetime.utcnow())
session.add(topic)
session.commit()
try:
session.commit()
except IntegrityError:
raise AlreadyExistsError("This topic already exists")
return TopicSchema.from_orm(topic)

@classmethod
def get_model(cls, id: str, session: Session) -> Topic | None:
return session.query(Topic).filter_by(id=id).first()

@classmethod
def get(cls, id: str, session: Session) -> TopicSchema:
topic = cls.get_model(id=id, session=session)
topic = get_model(model=Topic, filters={"id": id}, session=session)
if topic is None:
raise NotFoundError("topic not found")
raise NotFoundError("Topic not found")
return TopicSchema.from_orm(topic)

@classmethod
def list(
cls, filters: dict | None, offset: int | None, limit: int | None, session: Session
) -> ListTopicSchema:
topics = session.query(Topic).order_by(Topic.id)
topics = apply_basic_filters(topics, filters, offset, limit)
return ListTopicSchema(data=[TopicSchema.from_orm(topic) for topic in topics.all()])
topics = list_model(
model=Topic, filters=filters, offset=offset, limit=limit, order_by=Topic.id, session=session
)
return ListTopicSchema(data=[TopicSchema.from_orm(topic) for topic in topics])

@classmethod
def delete(cls, id: str, session: Session) -> None:
cls.get(id, session=session)
session.query(Queue).filter_by(topic_id=id).update({"topic_id": None})
session.query(Topic).filter_by(id=id).delete()
session.commit()

Expand All @@ -83,15 +107,18 @@ def create(cls, data: CreateQueueSchema, session: Session) -> QueueSchema:
updated_at=now,
)
session.add(queue)
session.commit()
try:
session.commit()
except IntegrityError:
raise AlreadyExistsError("This queue already exists")
return QueueSchema.from_orm(queue)

@classmethod
def update(cls, id: str, data: UpdateQueueSchema, session: Session) -> QueueSchema:
if data.topic_id is not None:
TopicService.get(data.topic_id, session=session)

queue = cls.get_model(id, session=session)
queue = get_model(model=Queue, filters={"id": id}, session=session)
queue.topic_id = data.topic_id
queue.ack_deadline_seconds = data.ack_deadline_seconds
queue.message_retention_seconds = data.message_retention_seconds
Expand All @@ -102,13 +129,9 @@ def update(cls, id: str, data: UpdateQueueSchema, session: Session) -> QueueSche
session.commit()
return QueueSchema.from_orm(queue)

@classmethod
def get_model(cls, id: str, session: Session) -> Queue:
return session.query(Queue).filter_by(id=id).first()

@classmethod
def get(cls, id: str, session: Session) -> QueueSchema:
queue = cls.get_model(id=id, session=session)
queue = get_model(model=Queue, filters={"id": id}, session=session)
if queue is None:
raise NotFoundError("queue not found")
return QueueSchema.from_orm(queue)
Expand All @@ -117,13 +140,15 @@ def get(cls, id: str, session: Session) -> QueueSchema:
def list(
cls, filters: dict | None, offset: int | None, limit: int | None, session: Session
) -> ListQueueSchema:
queues = session.query(Queue).order_by(Queue.id)
queues = apply_basic_filters(queues, filters, offset, limit)
return ListQueueSchema(data=[QueueSchema.from_orm(queue) for queue in queues.all()])
queues = list_model(
model=Queue, filters=filters, offset=offset, limit=limit, order_by=Queue.id, session=session
)
return ListQueueSchema(data=[QueueSchema.from_orm(queue) for queue in queues])

@classmethod
def delete(cls, id: str, session: Session) -> None:
cls.get(id, session=session)
session.query(Message).filter_by(queue_id=id).delete()
session.query(Queue).filter_by(id=id).delete()
session.commit()

Expand Down Expand Up @@ -177,22 +202,10 @@ def create(cls, topic_id: str, data: CreateMessageSchema, session: Session) -> L
session.commit()
return result

@classmethod
def update(cls, id: str, data: UpdateMessageSchema, session: Session) -> MessageSchema:
message = cls.get_model(id, session=session)
message.delivery_attempts = data.delivery_attempts
message.scheduled_at = data.scheduled_at
message.updated_at = datetime.utcnow()
session.commit()
return MessageSchema.from_orm(message)

@classmethod
def get_model(cls, id: str, session: Session) -> Message:
return session.query(Message).filter_by(id=id).first()

@classmethod
def get(cls, id: str, session: Session) -> MessageSchema:
message = cls.get_model(id=id, session=session)
# message = cls.get_model(id=id, session=session)
message = get_model(model=Message, filters={"id": id}, session=session)
if message is None:
raise NotFoundError("message not found")
return MessageSchema.from_orm(message)
Expand All @@ -201,9 +214,15 @@ def get(cls, id: str, session: Session) -> MessageSchema:
def list(
cls, filters: dict | None, offset: int | None, limit: int | None, session: Session
) -> ListMessageSchema:
messages = session.query(Message).order_by(Message.created_at)
messages = apply_basic_filters(messages, filters, offset, limit)
return ListMessageSchema(data=[MessageSchema.from_orm(message) for message in messages.all()])
messages = list_model(
model=Message,
filters=filters,
offset=offset,
limit=limit,
order_by=Message.created_at,
session=session,
)
return ListMessageSchema(data=[MessageSchema.from_orm(message) for message in messages])

@classmethod
def list_for_consume(cls, queue_id: str, limit: int, session: Session) -> ListMessageSchema:
Expand Down
Loading

0 comments on commit f8dd58a

Please sign in to comment.