Skip to content

Commit

Permalink
feat: add ack and nack endpoints (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
allisson authored Sep 7, 2022
1 parent be80f17 commit 940a5c6
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 0 deletions.
10 changes: 10 additions & 0 deletions fastqueue/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ def list_messages_for_consume(queue_id: str, limit: int = 10, session: Session =
return MessageService.list_for_consume(queue_id=queue_id, limit=limit, session=session)


@app.post("/messages/{message_id}/ack", status_code=status.HTTP_204_NO_CONTENT, tags=["messages"])
def ack_message(message_id: str, session: Session = Depends(get_session)):
return MessageService.ack(id=message_id, session=session)


@app.post("/messages/{message_id}/nack", status_code=status.HTTP_204_NO_CONTENT, tags=["messages"])
def nack_message(message_id: str, session: Session = Depends(get_session)):
return MessageService.nack(id=message_id, session=session)


def run_server():
uvicorn.run(
"fastqueue.api:app",
Expand Down
16 changes: 16 additions & 0 deletions fastqueue/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,19 @@ def list_for_consume(cls, queue_id: str, limit: int, session: Session) -> ListMe

session.commit()
return ListMessageSchema(data=data)

@classmethod
def ack(cls, id: str, session: Session) -> None:
session.query(Message).filter_by(id=id).delete()
session.commit()

@classmethod
def nack(cls, id: str, session: Session) -> None:
message = get_model(model=Message, filters={"id": id}, session=session)
if message is None:
return

now = datetime.utcnow()
message.scheduled_at = now
message.updated_at = now
session.commit()
12 changes: 12 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,15 @@ def test_list_messages_for_consume(session, message, client):
assert m["queue_id"] == message.queue_id
assert m["data"] == message.data
assert m["attributes"] == message.attributes


def test_ack_message(session, message, client):
response = client.post(f"/messages/{message.id}/ack")

assert response.status_code == status.HTTP_204_NO_CONTENT


def test_nack_message(session, message, client):
response = client.post(f"/messages/{message.id}/nack")

assert response.status_code == status.HTTP_204_NO_CONTENT
32 changes: 32 additions & 0 deletions tests/test_services.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
from datetime import datetime
from time import sleep

Expand Down Expand Up @@ -274,3 +275,34 @@ def test_message_service_list_for_consume_with_message_max_deliveries(session, q
now = datetime.utcnow()
result = MessageService.list_for_consume(queue_id=queue.id, limit=10, session=session)
assert len(result.data) == 0


def test_message_service_ack(session, message):
assert session.query(Message).filter_by(id=message.id).count() == 1
assert MessageService.ack(id=message.id, session=session) is None
assert session.query(Message).filter_by(id=message.id).count() == 0


def test_message_service_ack_with_removed_message(session):
id = uuid.uuid4().hex
assert session.query(Message).filter_by(id=id).count() == 0
assert MessageService.ack(id=id, session=session) is None
assert session.query(Message).filter_by(id=id).count() == 0


def test_message_service_nack(session, message):
original_scheduled_at = message.scheduled_at
original_updated_at = message.updated_at
assert session.query(Message).filter_by(id=message.id).count() == 1
assert MessageService.nack(id=message.id, session=session) is None
assert session.query(Message).filter_by(id=message.id).count() == 1
session.refresh(message)
assert message.scheduled_at != original_scheduled_at
assert message.updated_at != original_updated_at


def test_message_service_nack_with_removed_message(session):
id = uuid.uuid4().hex
assert session.query(Message).filter_by(id=id).count() == 0
assert MessageService.nack(id=id, session=session) is None
assert session.query(Message).filter_by(id=id).count() == 0

0 comments on commit 940a5c6

Please sign in to comment.