Skip to content

Commit

Permalink
refactor: update services to use get_model (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
allisson authored Oct 3, 2022
1 parent 9c31692 commit 912c425
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 170 deletions.
36 changes: 18 additions & 18 deletions fastqueue/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ def list_model(
def get_model(model: Any, filters: dict | None, session: Session) -> Any:
query = session.query(model)
query = apply_basic_filters(query=query, filters=filters, offset=None, limit=None)
return query.first()
instance = query.first()
if instance is None:
raise NotFoundError(f"{model.__name__} not found")
return instance


class TopicService:
Expand All @@ -69,8 +72,6 @@ def create(cls, data: CreateTopicSchema, session: Session) -> TopicSchema:
@classmethod
def get(cls, id: str, session: Session) -> TopicSchema:
topic = get_model(model=Topic, filters={"id": id}, session=session)
if topic is None:
raise NotFoundError("Topic not found")
return TopicSchema.from_orm(topic)

@classmethod
Expand All @@ -84,7 +85,7 @@ def list(

@classmethod
def delete(cls, id: str, session: Session) -> None:
cls.get(id, session=session)
get_model(model=Topic, filters={"id": id}, session=session)
session.query(Queue).filter_by(topic_id=id).update({"topic_id": None})
session.query(Topic).filter_by(id=id).delete()
session.commit()
Expand All @@ -94,10 +95,10 @@ class QueueService:
@classmethod
def create(cls, data: CreateQueueSchema, session: Session) -> QueueSchema:
if data.topic_id is not None:
TopicService.get(data.topic_id, session=session)
get_model(model=Topic, filters={"id": data.topic_id}, session=session)

if data.dead_queue_id is not None:
cls.get(data.dead_queue_id, session=session)
get_model(model=Queue, filters={"id": data.dead_queue_id}, session=session)

now = datetime.utcnow()
queue = Queue(
Expand All @@ -121,14 +122,12 @@ def create(cls, data: CreateQueueSchema, session: Session) -> QueueSchema:
@classmethod
def update(cls, id: str, data: UpdateQueueSchema, session: Session) -> QueueSchema:
queue = get_model(model=Queue, filters={"id": id}, session=session)
if queue is None:
raise NotFoundError("Queue not found")

if data.topic_id is not None:
TopicService.get(data.topic_id, session=session)
get_model(model=Topic, filters={"id": data.topic_id}, session=session)

if data.dead_queue_id is not None:
cls.get(data.dead_queue_id, session=session)
get_model(model=Queue, filters={"id": data.dead_queue_id}, session=session)

queue.topic_id = data.topic_id
queue.dead_queue_id = data.dead_queue_id
Expand All @@ -144,8 +143,6 @@ def update(cls, id: str, data: UpdateQueueSchema, session: Session) -> QueueSche
@classmethod
def get(cls, id: str, session: Session) -> QueueSchema:
queue = get_model(model=Queue, filters={"id": id}, session=session)
if queue is None:
raise NotFoundError("Queue not found")
return QueueSchema.from_orm(queue)

@classmethod
Expand All @@ -159,15 +156,16 @@ def list(

@classmethod
def delete(cls, id: str, session: Session) -> None:
cls.get(id, session=session)
get_model(model=Queue, filters={"id": id}, session=session)
session.query(Message).filter_by(queue_id=id).delete()
session.query(Queue).filter_by(dead_queue_id=id).update({"dead_queue_id": None})
session.query(Queue).filter_by(id=id).delete()
session.commit()

@classmethod
def stats(cls, id: str, session: Session) -> QueueStatsSchema:
queue = cls.get(id=id, session=session)
queue = get_model(model=Queue, filters={"id": id}, session=session)

now = datetime.utcnow()
filters = [Message.queue_id == queue.id, Message.expired_at >= now, Message.scheduled_at <= now]
if queue.message_max_deliveries is not None:
Expand Down Expand Up @@ -196,7 +194,8 @@ def _cleanup_move_messages_to_dead_queue(cls, queue: QueueSchema, session: Sessi
if queue.message_max_deliveries is None or queue.dead_queue_id is None:
return

dead_queue = cls.get(id=queue.dead_queue_id, session=session)
dead_queue = get_model(model=Queue, filters={"id": queue.dead_queue_id}, session=session)

delivery_attempts_filter = [
Message.queue_id == queue.id,
Message.delivery_attempts >= queue.message_max_deliveries,
Expand All @@ -213,7 +212,7 @@ def _cleanup_move_messages_to_dead_queue(cls, queue: QueueSchema, session: Sessi

@classmethod
def cleanup(cls, id: str, session: Session) -> None:
queue = cls.get(id=id, session=session)
queue = get_model(model=Queue, filters={"id": id}, session=session)

cls._cleanup_expired_messages(queue=queue, session=session)
cls._cleanup_move_messages_to_dead_queue(queue=queue, session=session)
Expand Down Expand Up @@ -298,8 +297,9 @@ def ack(cls, id: str, session: Session) -> None:

@classmethod
def nack(cls, id: str, session: Session) -> None:
message = get_model(model=Message, filters={"id": id}, session=session)
if message is None:
try:
message = get_model(model=Message, filters={"id": id}, session=session)
except NotFoundError:
return

now = datetime.utcnow()
Expand Down
2 changes: 1 addition & 1 deletion fastqueue/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from fastqueue.services import QueueService

logger = get_logger(__name__)
worker = Rocketry()
worker = Rocketry(execution="main")


@worker.task(every(f"{settings.queue_cleanup_interval_seconds} seconds"))
Expand Down
Loading

0 comments on commit 912c425

Please sign in to comment.