Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ack and nack endpoints #13

Merged
merged 1 commit into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions fastqueue/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ def list_messages_for_consume(queue_id: str, limit: int = 10, session: Session =
return MessageService.list_for_consume(queue_id=queue_id, limit=limit, session=session)


@app.post("/messages/{message_id}/ack", status_code=status.HTTP_204_NO_CONTENT, tags=["messages"])
def ack_message(message_id: str, session: Session = Depends(get_session)):
return MessageService.ack(id=message_id, session=session)


@app.post("/messages/{message_id}/nack", status_code=status.HTTP_204_NO_CONTENT, tags=["messages"])
def nack_message(message_id: str, session: Session = Depends(get_session)):
return MessageService.nack(id=message_id, session=session)


def run_server():
uvicorn.run(
"fastqueue.api:app",
Expand Down
16 changes: 16 additions & 0 deletions fastqueue/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,19 @@ def list_for_consume(cls, queue_id: str, limit: int, session: Session) -> ListMe

session.commit()
return ListMessageSchema(data=data)

@classmethod
def ack(cls, id: str, session: Session) -> None:
session.query(Message).filter_by(id=id).delete()
session.commit()

@classmethod
def nack(cls, id: str, session: Session) -> None:
message = get_model(model=Message, filters={"id": id}, session=session)
if message is None:
return

now = datetime.utcnow()
message.scheduled_at = now
message.updated_at = now
session.commit()
12 changes: 12 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,15 @@ def test_list_messages_for_consume(session, message, client):
assert m["queue_id"] == message.queue_id
assert m["data"] == message.data
assert m["attributes"] == message.attributes


def test_ack_message(session, message, client):
response = client.post(f"/messages/{message.id}/ack")

assert response.status_code == status.HTTP_204_NO_CONTENT


def test_nack_message(session, message, client):
response = client.post(f"/messages/{message.id}/nack")

assert response.status_code == status.HTTP_204_NO_CONTENT
32 changes: 32 additions & 0 deletions tests/test_services.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
from datetime import datetime
from time import sleep

Expand Down Expand Up @@ -274,3 +275,34 @@ def test_message_service_list_for_consume_with_message_max_deliveries(session, q
now = datetime.utcnow()
result = MessageService.list_for_consume(queue_id=queue.id, limit=10, session=session)
assert len(result.data) == 0


def test_message_service_ack(session, message):
assert session.query(Message).filter_by(id=message.id).count() == 1
assert MessageService.ack(id=message.id, session=session) is None
assert session.query(Message).filter_by(id=message.id).count() == 0


def test_message_service_ack_with_removed_message(session):
id = uuid.uuid4().hex
assert session.query(Message).filter_by(id=id).count() == 0
assert MessageService.ack(id=id, session=session) is None
assert session.query(Message).filter_by(id=id).count() == 0


def test_message_service_nack(session, message):
original_scheduled_at = message.scheduled_at
original_updated_at = message.updated_at
assert session.query(Message).filter_by(id=message.id).count() == 1
assert MessageService.nack(id=message.id, session=session) is None
assert session.query(Message).filter_by(id=message.id).count() == 1
session.refresh(message)
assert message.scheduled_at != original_scheduled_at
assert message.updated_at != original_updated_at


def test_message_service_nack_with_removed_message(session):
id = uuid.uuid4().hex
assert session.query(Message).filter_by(id=id).count() == 0
assert MessageService.nack(id=id, session=session) is None
assert session.query(Message).filter_by(id=id).count() == 0