Skip to content

Commit

Permalink
feat: add redrive support (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
allisson authored Oct 7, 2022
1 parent f602055 commit 3eac767
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 9 deletions.
69 changes: 69 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# fastqueue
Simple queue system based on FastAPI and PostgreSQL.

## Features

- Simple rest api.
- Message filtering support.
- Dead queue support.
- Redrive support (move messages between queues).
- Simplicity, it does the minimum necessary, it will not have an authentication/permission scheme among other things.

## Quickstart

Let's start with the basic concepts, we have three main entities that we must know to start:
Expand Down Expand Up @@ -676,3 +684,64 @@ content-type: application/json

{"num_undelivered_messages":2,"oldest_unacked_message_age_seconds":5410}
```

## Redrive support

With redrive you can move messages between queues, we usually use this feature to move messages from the dead queue to the original queue.

```bash
curl -i -X 'GET' \
'http://localhost:8000/queues/all-events/stats' \
-H 'accept: application/json'

HTTP/1.1 200 OK
date: Fri, 07 Oct 2022 20:13:15 GMT
server: uvicorn
content-length: 69
content-type: application/json

{"num_undelivered_messages":0,"oldest_unacked_message_age_seconds":0}
```

```bash
curl -i -X 'GET' \
'http://localhost:8000/queues/all-events-dead/stats' \
-H 'accept: application/json'

HTTP/1.1 200 OK
date: Fri, 07 Oct 2022 20:13:36 GMT
server: uvicorn
content-length: 71
content-type: application/json

{"num_undelivered_messages":2,"oldest_unacked_message_age_seconds":573}
```

```bash
curl -i -X 'PUT' \
'http://localhost:8000/queues/all-events-dead/redrive' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"destination_queue_id": "all-events"
}'

HTTP/1.1 204 No Content
date: Fri, 07 Oct 2022 20:16:39 GMT
server: uvicorn
content-type: application/json
```

```bash
curl -i -X 'GET' \
'http://localhost:8000/queues/all-events/stats' \
-H 'accept: application/json'

HTTP/1.1 200 OK
date: Fri, 07 Oct 2022 20:17:09 GMT
server: uvicorn
content-length: 71
content-type: application/json

{"num_undelivered_messages":2,"oldest_unacked_message_age_seconds":787}
```
11 changes: 11 additions & 0 deletions fastqueue/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
NotFoundSchema,
QueueSchema,
QueueStatsSchema,
RedriveQueueSchema,
TopicSchema,
UpdateQueueSchema,
)
Expand Down Expand Up @@ -140,6 +141,16 @@ def purge_queue_messages(queue_id: str, session: Session = Depends(get_session))
return QueueService.purge(id=queue_id, session=session)


@app.put(
"/queues/{queue_id}/redrive",
status_code=status.HTTP_204_NO_CONTENT,
tags=["queues"],
responses={404: {"model": NotFoundSchema}},
)
def redrive_queue_messages(queue_id: str, data: RedriveQueueSchema, session: Session = Depends(get_session)):
return QueueService.redrive(id=queue_id, data=data, session=session)


@app.delete(
"/queues/{queue_id}",
status_code=status.HTTP_204_NO_CONTENT,
Expand Down
4 changes: 4 additions & 0 deletions fastqueue/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class QueueStatsSchema(Schema):
oldest_unacked_message_age_seconds: int


class RedriveQueueSchema(Schema):
destination_queue_id: str


class ListQueueSchema(Schema):
data: list[QueueSchema]

Expand Down
34 changes: 26 additions & 8 deletions fastqueue/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
MessageSchema,
QueueSchema,
QueueStatsSchema,
RedriveQueueSchema,
TopicSchema,
UpdateQueueSchema,
)
Expand Down Expand Up @@ -58,6 +59,13 @@ def get_model(model: Any, filters: dict | None, session: Session) -> Any:
return instance


def get_filters_for_consume(queue: Any, now: datetime) -> list:
filters = [Message.queue_id == queue.id, Message.expired_at >= now, Message.scheduled_at <= now]
if queue.dead_queue_id is not None and queue.message_max_deliveries is not None:
filters.append(Message.delivery_attempts < queue.message_max_deliveries)
return filters


class TopicService:
@classmethod
def create(cls, data: CreateTopicSchema, session: Session) -> TopicSchema:
Expand Down Expand Up @@ -167,10 +175,7 @@ def stats(cls, id: str, session: Session) -> QueueStatsSchema:
queue = get_model(model=Queue, filters={"id": id}, session=session)

now = datetime.utcnow()
filters = [Message.queue_id == queue.id, Message.expired_at >= now, Message.scheduled_at <= now]
if queue.message_max_deliveries is not None:
filters.append(Message.delivery_attempts < queue.message_max_deliveries)

filters = get_filters_for_consume(queue, now)
num_undelivered_messages = session.query(Message).filter(*filters).count()
oldest_unacked_message_age_seconds = 0
oldest_unacked_message = session.query(Message).filter(*filters).order_by(Message.created_at).first()
Expand Down Expand Up @@ -225,6 +230,22 @@ def cleanup(cls, id: str, session: Session) -> None:

session.commit()

@classmethod
def redrive(cls, id: str, data: RedriveQueueSchema, session: Session) -> None:
queue = get_model(model=Queue, filters={"id": id}, session=session)
destination_queue = get_model(model=Queue, filters={"id": data.destination_queue_id}, session=session)
now = datetime.utcnow()
filters = get_filters_for_consume(queue, now)
update_data = {
"queue_id": destination_queue.id,
"delivery_attempts": 0,
"expired_at": now + timedelta(seconds=destination_queue.message_retention_seconds),
"scheduled_at": now,
"updated_at": now,
}
session.query(Message).filter(*filters).update(update_data)
session.commit()


class MessageService:
@classmethod
Expand Down Expand Up @@ -279,10 +300,7 @@ def create(cls, topic_id: str, data: CreateMessageSchema, session: Session) -> L
def list_for_consume(cls, queue_id: str, limit: int, session: Session) -> ListMessageSchema:
queue = QueueService.get(id=queue_id, session=session)
now = datetime.utcnow()
filters = [Message.queue_id == queue.id, Message.expired_at >= now, Message.scheduled_at <= now]
if queue.dead_queue_id is not None and queue.message_max_deliveries is not None:
filters.append(Message.delivery_attempts < queue.message_max_deliveries)

filters = get_filters_for_consume(queue, now)
data = []
messages = (
session.query(Message).filter(*filters).with_for_update(skip_locked=True).limit(limit).all()
Expand Down
26 changes: 26 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,32 @@ def test_purge_queue_messages_not_found(session, client):
assert response_data == {"detail": "Queue not found"}


def test_redrive_queue_messages(session, queue, client):
dead_queue = QueueFactory()
session.add(dead_queue)
session.commit()

session.add(MessageFactory(queue_id=dead_queue.id))
session.commit()

response = client.put(f"/queues/{dead_queue.id}/redrive", json={"destination_queue_id": queue.id})
assert response.status_code == status.HTTP_204_NO_CONTENT

response = client.get(f"/queues/{queue.id}/stats")
response_data = response.json()

assert response.status_code == status.HTTP_200_OK
assert response_data == {"num_undelivered_messages": 1, "oldest_unacked_message_age_seconds": 0}


def test_redrive_queue_messages_not_found(session, queue, client):
response = client.put("/queues/not-found-queue/redrive", json={"destination_queue_id": queue.id})
response_data = response.json()

assert response.status_code == status.HTTP_404_NOT_FOUND
assert response_data == {"detail": "Queue not found"}


def test_delete_queue(session, queue, client):
response = client.delete(f"/queues/{queue.id}")

Expand Down
34 changes: 33 additions & 1 deletion tests/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@

from fastqueue.exceptions import NotFoundError
from fastqueue.models import Message, Queue
from fastqueue.schemas import CreateMessageSchema, CreateQueueSchema, CreateTopicSchema, UpdateQueueSchema
from fastqueue.schemas import (
CreateMessageSchema,
CreateQueueSchema,
CreateTopicSchema,
RedriveQueueSchema,
UpdateQueueSchema,
)
from fastqueue.services import MessageService, QueueService, TopicService
from tests.factories import MessageFactory, QueueFactory, TopicFactory

Expand Down Expand Up @@ -235,6 +241,32 @@ def test_queue_service_cleanup_move_to_dead_queue(session, queue):
assert message3.delivery_attempts == 0


def test_queue_service_redrive(session, queue):
dead_queue = QueueFactory()
session.add(dead_queue)
session.commit()
message1 = MessageFactory(queue_id=dead_queue.id)
message2 = MessageFactory(queue_id=dead_queue.id)
message3 = MessageFactory(queue_id=dead_queue.id)
session.add(message1)
session.add(message2)
session.add(message3)
session.commit()

assert session.query(Message).filter_by(queue_id=queue.id).count() == 0
assert session.query(Message).filter_by(queue_id=dead_queue.id).count() == 3

data = RedriveQueueSchema(destination_queue_id=queue.id, message_count=3)
assert QueueService.redrive(id=dead_queue.id, data=data, session=session) is None

assert session.query(Message).filter_by(queue_id=queue.id).count() == 3
assert session.query(Message).filter_by(queue_id=dead_queue.id).count() == 0
messages = session.query(Message).filter_by(queue_id=queue.id).all()
assert message1 in messages
assert message2 in messages
assert message3 in messages


@pytest.mark.parametrize(
"queue_filters,message_attributes,expected",
[
Expand Down

0 comments on commit 3eac767

Please sign in to comment.