Skip to content

Commit

Permalink
feat: add endpoint to purge the queue messages (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
allisson authored Oct 5, 2022
1 parent 7bce9a2 commit b55c7fd
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 7 deletions.
10 changes: 10 additions & 0 deletions fastqueue/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ def get_queue_stats(queue_id: str, session: Session = Depends(get_session)):
return QueueService.stats(id=queue_id, session=session)


@app.put(
"/queues/{queue_id}/purge",
status_code=status.HTTP_204_NO_CONTENT,
tags=["queues"],
responses={404: {"model": NotFoundSchema}},
)
def purge_queue_messages(queue_id: str, session: Session = Depends(get_session)):
return QueueService.purge(id=queue_id, session=session)


@app.delete(
"/queues/{queue_id}",
status_code=status.HTTP_204_NO_CONTENT,
Expand Down
20 changes: 13 additions & 7 deletions fastqueue/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ def list(

@classmethod
def delete(cls, id: str, session: Session) -> None:
get_model(model=Topic, filters={"id": id}, session=session)
session.query(Queue).filter_by(topic_id=id).update({"topic_id": None})
session.query(Topic).filter_by(id=id).delete()
topic = get_model(model=Topic, filters={"id": id}, session=session)
session.query(Queue).filter_by(topic_id=topic.id).update({"topic_id": None})
session.query(Topic).filter_by(id=topic.id).delete()
session.commit()


Expand Down Expand Up @@ -156,10 +156,10 @@ def list(

@classmethod
def delete(cls, id: str, session: Session) -> None:
get_model(model=Queue, filters={"id": id}, session=session)
session.query(Message).filter_by(queue_id=id).delete()
session.query(Queue).filter_by(dead_queue_id=id).update({"dead_queue_id": None})
session.query(Queue).filter_by(id=id).delete()
queue = get_model(model=Queue, filters={"id": id}, session=session)
session.query(Message).filter_by(queue_id=queue.id).delete()
session.query(Queue).filter_by(dead_queue_id=queue.id).update({"dead_queue_id": None})
session.query(Queue).filter_by(id=queue.id).delete()
session.commit()

@classmethod
Expand All @@ -182,6 +182,12 @@ def stats(cls, id: str, session: Session) -> QueueStatsSchema:
oldest_unacked_message_age_seconds=oldest_unacked_message_age_seconds,
)

@classmethod
def purge(cls, id: str, session: Session) -> None:
queue = get_model(model=Queue, filters={"id": id}, session=session)
session.query(Message).filter_by(queue_id=queue.id).delete()
session.commit()

@classmethod
def _cleanup_expired_messages(cls, queue: QueueSchema, session: Session) -> None:
now = datetime.utcnow()
Expand Down
33 changes: 33 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,39 @@ def test_get_queue_stats(session, queue, client):
assert response_data == {"num_undelivered_messages": 5, "oldest_unacked_message_age_seconds": 10}


def test_get_queue_stats_not_found(session, client):
response = client.get("/queues/not-found-queue/stats")
response_data = response.json()

assert response.status_code == status.HTTP_404_NOT_FOUND
assert response_data == {"detail": "Queue not found"}


def test_purge_queue_messages(session, queue, client):
created_at = datetime.utcnow() - timedelta(seconds=10)
messages = MessageFactory.build_batch(5, queue_id=queue.id, created_at=created_at)
for message in messages:
session.add(message)
session.commit()

response = client.put(f"/queues/{queue.id}/purge")
assert response.status_code == status.HTTP_204_NO_CONTENT

response = client.get(f"/queues/{queue.id}/stats")
response_data = response.json()

assert response.status_code == status.HTTP_200_OK
assert response_data == {"num_undelivered_messages": 0, "oldest_unacked_message_age_seconds": 0}


def test_purge_queue_messages_not_found(session, client):
response = client.put("/queues/not-found-queue/purge")
response_data = response.json()

assert response.status_code == status.HTTP_404_NOT_FOUND
assert response_data == {"detail": "Queue not found"}


def test_delete_queue(session, queue, client):
response = client.delete(f"/queues/{queue.id}")

Expand Down
12 changes: 12 additions & 0 deletions tests/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,18 @@ def test_queue_service_stats(session, queue):
assert result.oldest_unacked_message_age_seconds == 10


def test_queue_service_purge(session, queue):
created_at = datetime.utcnow() - timedelta(seconds=10)
messages = MessageFactory.build_batch(5, queue_id=queue.id, created_at=created_at)
for message in messages:
session.add(message)
session.commit()
assert session.query(Message).filter_by(queue_id=queue.id).count() == 5

assert QueueService.purge(id=queue.id, session=session) is None
assert session.query(Message).filter_by(queue_id=queue.id).count() == 0


def test_queue_service_cleanup_expired_at(session, queue):
message1 = MessageFactory(queue_id=queue.id, expired_at=datetime.utcnow() - timedelta(seconds=1))
message2 = MessageFactory(queue_id=queue.id, expired_at=datetime.utcnow() + timedelta(seconds=1))
Expand Down

0 comments on commit b55c7fd

Please sign in to comment.