Skip to content

Commit

Permalink
feat: add dead queue support (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
allisson authored Sep 15, 2022
1 parent 6627950 commit 8e32aa0
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 6 deletions.
11 changes: 8 additions & 3 deletions alembic/versions/001_initial.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Auto generated
Revision ID: b290fd728c17
Revision ID: fe94d60449f2
Revises:
Create Date: 2022-09-06 21:44:14.941396
Create Date: 2022-09-15 17:51:40.259256
"""
import sqlalchemy as sa
Expand All @@ -11,7 +11,7 @@
from alembic import op

# revision identifiers, used by Alembic.
revision = "b290fd728c17"
revision = "fe94d60449f2"
down_revision = None
branch_labels = None
depends_on = None
Expand All @@ -29,12 +29,17 @@ def upgrade() -> None:
"queues",
sa.Column("id", sa.String(length=128), nullable=False),
sa.Column("topic_id", sa.String(length=128), nullable=True),
sa.Column("dead_queue_id", sa.String(length=128), nullable=True),
sa.Column("ack_deadline_seconds", sa.Integer(), nullable=False),
sa.Column("message_retention_seconds", sa.Integer(), nullable=False),
sa.Column("message_filters", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column("message_max_deliveries", sa.Integer(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["dead_queue_id"],
["queues.id"],
),
sa.ForeignKeyConstraint(
["topic_id"],
["topics.id"],
Expand Down
3 changes: 3 additions & 0 deletions fastqueue/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ class Queue(Base):
topic_id = sqlalchemy.Column(
sqlalchemy.String(length=128), sqlalchemy.ForeignKey("topics.id"), index=True, nullable=True
)
dead_queue_id = sqlalchemy.Column(
sqlalchemy.String(length=128), sqlalchemy.ForeignKey("queues.id"), nullable=True
)
ack_deadline_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
message_retention_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
message_filters = sqlalchemy.Column(postgresql.JSONB, nullable=True)
Expand Down
3 changes: 3 additions & 0 deletions fastqueue/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ListTopicSchema(Schema):
class CreateQueueSchema(Schema):
id: str = Field(..., regex=regex_for_id, max_length=128)
topic_id: str | None = Field(None, regex=regex_for_id, max_length=128)
dead_queue_id: str | None = Field(None, regex=regex_for_id, max_length=128)
ack_deadline_seconds: int = Field(
..., ge=settings.min_ack_deadline_seconds, le=settings.max_ack_deadline_seconds
)
Expand All @@ -46,6 +47,7 @@ class CreateQueueSchema(Schema):

class UpdateQueueSchema(Schema):
topic_id: str | None = Field(None, regex=regex_for_id, max_length=128)
dead_queue_id: str | None = Field(None, regex=regex_for_id, max_length=128)
ack_deadline_seconds: int = Field(
..., ge=settings.min_ack_deadline_seconds, le=settings.max_ack_deadline_seconds
)
Expand All @@ -61,6 +63,7 @@ class UpdateQueueSchema(Schema):
class QueueSchema(Schema):
id: str
topic_id: str | None
dead_queue_id: str | None
ack_deadline_seconds: int
message_retention_seconds: int
message_filters: dict[str, list[str]] | None
Expand Down
42 changes: 39 additions & 3 deletions fastqueue/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,14 @@ def create(cls, data: CreateQueueSchema, session: Session) -> QueueSchema:
if data.topic_id is not None:
TopicService.get(data.topic_id, session=session)

if data.dead_queue_id is not None:
cls.get(data.dead_queue_id, session=session)

now = datetime.utcnow()
queue = Queue(
id=data.id,
topic_id=data.topic_id,
dead_queue_id=data.dead_queue_id,
ack_deadline_seconds=data.ack_deadline_seconds,
message_retention_seconds=data.message_retention_seconds,
message_filters=data.message_filters,
Expand All @@ -123,7 +127,11 @@ def update(cls, id: str, data: UpdateQueueSchema, session: Session) -> QueueSche
if data.topic_id is not None:
TopicService.get(data.topic_id, session=session)

if data.dead_queue_id is not None:
cls.get(data.dead_queue_id, session=session)

queue.topic_id = data.topic_id
queue.dead_queue_id = data.dead_queue_id
queue.ack_deadline_seconds = data.ack_deadline_seconds
queue.message_retention_seconds = data.message_retention_seconds
queue.message_filters = data.message_filters
Expand Down Expand Up @@ -153,6 +161,7 @@ def list(
def delete(cls, id: str, session: Session) -> None:
cls.get(id, session=session)
session.query(Message).filter_by(queue_id=id).delete()
session.query(Queue).filter_by(dead_queue_id=id).update({"dead_queue_id": None})
session.query(Queue).filter_by(id=id).delete()
session.commit()

Expand All @@ -176,20 +185,47 @@ def stats(cls, id: str, session: Session) -> QueueStatsSchema:
)

@classmethod
def cleanup(cls, id: str, session: Session) -> None:
queue = cls.get(id=id, session=session)
def _cleanup_expired_messages(cls, queue: QueueSchema, session: Session) -> None:
now = datetime.utcnow()

expired_at_filter = [Message.queue_id == queue.id, Message.expired_at <= now]
session.query(Message).filter(*expired_at_filter).delete()

if queue.message_max_deliveries is not None:
@classmethod
def _cleanup_delivery_attempts_exceeded_messages(cls, queue: QueueSchema, session: Session) -> None:
if queue.message_max_deliveries is not None and queue.dead_queue_id is None:
delivery_attempts_filter = [
Message.queue_id == queue.id,
Message.delivery_attempts >= queue.message_max_deliveries,
]
session.query(Message).filter(*delivery_attempts_filter).delete()

@classmethod
def _cleanup_move_messages_to_dead_queue(cls, queue: QueueSchema, session: Session) -> None:
if queue.message_max_deliveries is not None and queue.dead_queue_id is not None:
dead_queue = cls.get(id=queue.dead_queue_id, session=session)
delivery_attempts_filter = [
Message.queue_id == queue.id,
Message.delivery_attempts >= queue.message_max_deliveries,
]
now = datetime.utcnow()
update_data = {
"queue_id": queue.dead_queue_id,
"delivery_attempts": 0,
"expired_at": now + timedelta(seconds=dead_queue.message_retention_seconds),
"scheduled_at": now,
"updated_at": now,
}
session.query(Message).filter(*delivery_attempts_filter).update(update_data)

@classmethod
def cleanup(cls, id: str, session: Session) -> None:
queue = cls.get(id=id, session=session)

cls._cleanup_expired_messages(queue=queue, session=session)
cls._cleanup_delivery_attempts_exceeded_messages(queue=queue, session=session)
cls._cleanup_move_messages_to_dead_queue(queue=queue, session=session)

session.commit()


Expand Down
1 change: 1 addition & 0 deletions tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Meta:
model = Queue

id = factory.Sequence(lambda n: "queue_%s" % n)
dead_queue_id = None
ack_deadline_seconds = default_ack_deadline_seconds
message_retention_seconds = default_message_retention_seconds
message_max_deliveries = None
Expand Down
39 changes: 39 additions & 0 deletions tests/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ def test_queue_service_delete(session, queue):
assert session.query(Message).filter_by(queue_id=queue.id).count() == 0


def test_queue_service_delete_dead_queue(session, queue):
dead_queue = QueueFactory()
session.add(dead_queue)
session.commit()
queue.dead_queue_id = dead_queue.id
session.commit()

assert QueueService.delete(dead_queue.id, session=session) is None

session.refresh(queue)
assert queue.dead_queue_id is None


def test_queue_service_delete_not_found(session):
with pytest.raises(NotFoundError):
QueueService.delete("invalid-queue-name", session=session)
Expand Down Expand Up @@ -201,6 +214,32 @@ def test_queue_service_cleanup_delivery_attempts(session, queue):
assert session.query(Message).filter_by(queue_id=queue.id).first() == message1


def test_queue_service_cleanup_move_to_dead_queue(session, queue):
dead_queue = QueueFactory()
session.add(dead_queue)
session.commit()
queue.message_max_deliveries = 2
queue.dead_queue_id = dead_queue.id
message1 = MessageFactory(queue_id=queue.id, delivery_attempts=1)
message2 = MessageFactory(queue_id=queue.id, delivery_attempts=2)
message3 = MessageFactory(queue_id=queue.id, delivery_attempts=3)
session.add(message1)
session.add(message2)
session.add(message3)
session.commit()
assert session.query(Message).filter_by(queue_id=queue.id).count() == 3

assert QueueService.cleanup(id=queue.id, session=session) is None
assert session.query(Message).filter_by(queue_id=queue.id).count() == 1
assert session.query(Message).filter_by(queue_id=queue.id).first() == message1

assert session.query(Message).filter_by(queue_id=dead_queue.id).count() == 2
session.refresh(message2)
session.refresh(message3)
assert message2.delivery_attempts == 0
assert message3.delivery_attempts == 0


@pytest.mark.parametrize(
"queue_filters,message_attributes,expected",
[
Expand Down

0 comments on commit 8e32aa0

Please sign in to comment.