From b55c7fdca1efeef3ea702991d7a7782b1944c755 Mon Sep 17 00:00:00 2001 From: Allisson Azevedo Date: Wed, 5 Oct 2022 08:27:10 -0300 Subject: [PATCH] feat: add endpoint to purge the queue messages (#22) --- fastqueue/api.py | 10 ++++++++++ fastqueue/services.py | 20 +++++++++++++------- tests/test_api.py | 33 +++++++++++++++++++++++++++++++++ tests/test_services.py | 12 ++++++++++++ 4 files changed, 68 insertions(+), 7 deletions(-) diff --git a/fastqueue/api.py b/fastqueue/api.py index bac9580..e29f11b 100644 --- a/fastqueue/api.py +++ b/fastqueue/api.py @@ -130,6 +130,16 @@ def get_queue_stats(queue_id: str, session: Session = Depends(get_session)): return QueueService.stats(id=queue_id, session=session) +@app.put( + "/queues/{queue_id}/purge", + status_code=status.HTTP_204_NO_CONTENT, + tags=["queues"], + responses={404: {"model": NotFoundSchema}}, +) +def purge_queue_messages(queue_id: str, session: Session = Depends(get_session)): + return QueueService.purge(id=queue_id, session=session) + + @app.delete( "/queues/{queue_id}", status_code=status.HTTP_204_NO_CONTENT, diff --git a/fastqueue/services.py b/fastqueue/services.py index 5cea3fd..f6475cb 100644 --- a/fastqueue/services.py +++ b/fastqueue/services.py @@ -85,9 +85,9 @@ def list( @classmethod def delete(cls, id: str, session: Session) -> None: - get_model(model=Topic, filters={"id": id}, session=session) - session.query(Queue).filter_by(topic_id=id).update({"topic_id": None}) - session.query(Topic).filter_by(id=id).delete() + topic = get_model(model=Topic, filters={"id": id}, session=session) + session.query(Queue).filter_by(topic_id=topic.id).update({"topic_id": None}) + session.query(Topic).filter_by(id=topic.id).delete() session.commit() @@ -156,10 +156,10 @@ def list( @classmethod def delete(cls, id: str, session: Session) -> None: - get_model(model=Queue, filters={"id": id}, session=session) - session.query(Message).filter_by(queue_id=id).delete() - session.query(Queue).filter_by(dead_queue_id=id).update({"dead_queue_id": None}) - session.query(Queue).filter_by(id=id).delete() + queue = get_model(model=Queue, filters={"id": id}, session=session) + session.query(Message).filter_by(queue_id=queue.id).delete() + session.query(Queue).filter_by(dead_queue_id=queue.id).update({"dead_queue_id": None}) + session.query(Queue).filter_by(id=queue.id).delete() session.commit() @classmethod @@ -182,6 +182,12 @@ def stats(cls, id: str, session: Session) -> QueueStatsSchema: oldest_unacked_message_age_seconds=oldest_unacked_message_age_seconds, ) + @classmethod + def purge(cls, id: str, session: Session) -> None: + queue = get_model(model=Queue, filters={"id": id}, session=session) + session.query(Message).filter_by(queue_id=queue.id).delete() + session.commit() + @classmethod def _cleanup_expired_messages(cls, queue: QueueSchema, session: Session) -> None: now = datetime.utcnow() diff --git a/tests/test_api.py b/tests/test_api.py index ac8243e..b4efc47 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -169,6 +169,39 @@ def test_get_queue_stats(session, queue, client): assert response_data == {"num_undelivered_messages": 5, "oldest_unacked_message_age_seconds": 10} +def test_get_queue_stats_not_found(session, client): + response = client.get("/queues/not-found-queue/stats") + response_data = response.json() + + assert response.status_code == status.HTTP_404_NOT_FOUND + assert response_data == {"detail": "Queue not found"} + + +def test_purge_queue_messages(session, queue, client): + created_at = datetime.utcnow() - timedelta(seconds=10) + messages = MessageFactory.build_batch(5, queue_id=queue.id, created_at=created_at) + for message in messages: + session.add(message) + session.commit() + + response = client.put(f"/queues/{queue.id}/purge") + assert response.status_code == status.HTTP_204_NO_CONTENT + + response = client.get(f"/queues/{queue.id}/stats") + response_data = response.json() + + assert response.status_code == status.HTTP_200_OK + assert response_data == {"num_undelivered_messages": 0, "oldest_unacked_message_age_seconds": 0} + + +def test_purge_queue_messages_not_found(session, client): + response = client.put("/queues/not-found-queue/purge") + response_data = response.json() + + assert response.status_code == status.HTTP_404_NOT_FOUND + assert response_data == {"detail": "Queue not found"} + + def test_delete_queue(session, queue, client): response = client.delete(f"/queues/{queue.id}") diff --git a/tests/test_services.py b/tests/test_services.py index 71e4471..ad39d99 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -184,6 +184,18 @@ def test_queue_service_stats(session, queue): assert result.oldest_unacked_message_age_seconds == 10 +def test_queue_service_purge(session, queue): + created_at = datetime.utcnow() - timedelta(seconds=10) + messages = MessageFactory.build_batch(5, queue_id=queue.id, created_at=created_at) + for message in messages: + session.add(message) + session.commit() + assert session.query(Message).filter_by(queue_id=queue.id).count() == 5 + + assert QueueService.purge(id=queue.id, session=session) is None + assert session.query(Message).filter_by(queue_id=queue.id).count() == 0 + + def test_queue_service_cleanup_expired_at(session, queue): message1 = MessageFactory(queue_id=queue.id, expired_at=datetime.utcnow() - timedelta(seconds=1)) message2 = MessageFactory(queue_id=queue.id, expired_at=datetime.utcnow() + timedelta(seconds=1))