Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dead queue support #18

Merged
merged 1 commit into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions alembic/versions/001_initial.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Auto generated

Revision ID: b290fd728c17
Revision ID: fe94d60449f2
Revises:
Create Date: 2022-09-06 21:44:14.941396
Create Date: 2022-09-15 17:51:40.259256

"""
import sqlalchemy as sa
Expand All @@ -11,7 +11,7 @@
from alembic import op

# revision identifiers, used by Alembic.
revision = "b290fd728c17"
revision = "fe94d60449f2"
down_revision = None
branch_labels = None
depends_on = None
Expand All @@ -29,12 +29,17 @@ def upgrade() -> None:
"queues",
sa.Column("id", sa.String(length=128), nullable=False),
sa.Column("topic_id", sa.String(length=128), nullable=True),
sa.Column("dead_queue_id", sa.String(length=128), nullable=True),
sa.Column("ack_deadline_seconds", sa.Integer(), nullable=False),
sa.Column("message_retention_seconds", sa.Integer(), nullable=False),
sa.Column("message_filters", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column("message_max_deliveries", sa.Integer(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["dead_queue_id"],
["queues.id"],
),
sa.ForeignKeyConstraint(
["topic_id"],
["topics.id"],
Expand Down
3 changes: 3 additions & 0 deletions fastqueue/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ class Queue(Base):
topic_id = sqlalchemy.Column(
sqlalchemy.String(length=128), sqlalchemy.ForeignKey("topics.id"), index=True, nullable=True
)
dead_queue_id = sqlalchemy.Column(
sqlalchemy.String(length=128), sqlalchemy.ForeignKey("queues.id"), nullable=True
)
ack_deadline_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
message_retention_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
message_filters = sqlalchemy.Column(postgresql.JSONB, nullable=True)
Expand Down
3 changes: 3 additions & 0 deletions fastqueue/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ListTopicSchema(Schema):
class CreateQueueSchema(Schema):
id: str = Field(..., regex=regex_for_id, max_length=128)
topic_id: str | None = Field(None, regex=regex_for_id, max_length=128)
dead_queue_id: str | None = Field(None, regex=regex_for_id, max_length=128)
ack_deadline_seconds: int = Field(
..., ge=settings.min_ack_deadline_seconds, le=settings.max_ack_deadline_seconds
)
Expand All @@ -46,6 +47,7 @@ class CreateQueueSchema(Schema):

class UpdateQueueSchema(Schema):
topic_id: str | None = Field(None, regex=regex_for_id, max_length=128)
dead_queue_id: str | None = Field(None, regex=regex_for_id, max_length=128)
ack_deadline_seconds: int = Field(
..., ge=settings.min_ack_deadline_seconds, le=settings.max_ack_deadline_seconds
)
Expand All @@ -61,6 +63,7 @@ class UpdateQueueSchema(Schema):
class QueueSchema(Schema):
id: str
topic_id: str | None
dead_queue_id: str | None
ack_deadline_seconds: int
message_retention_seconds: int
message_filters: dict[str, list[str]] | None
Expand Down
42 changes: 39 additions & 3 deletions fastqueue/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,14 @@ def create(cls, data: CreateQueueSchema, session: Session) -> QueueSchema:
if data.topic_id is not None:
TopicService.get(data.topic_id, session=session)

if data.dead_queue_id is not None:
cls.get(data.dead_queue_id, session=session)

now = datetime.utcnow()
queue = Queue(
id=data.id,
topic_id=data.topic_id,
dead_queue_id=data.dead_queue_id,
ack_deadline_seconds=data.ack_deadline_seconds,
message_retention_seconds=data.message_retention_seconds,
message_filters=data.message_filters,
Expand All @@ -123,7 +127,11 @@ def update(cls, id: str, data: UpdateQueueSchema, session: Session) -> QueueSche
if data.topic_id is not None:
TopicService.get(data.topic_id, session=session)

if data.dead_queue_id is not None:
cls.get(data.dead_queue_id, session=session)

queue.topic_id = data.topic_id
queue.dead_queue_id = data.dead_queue_id
queue.ack_deadline_seconds = data.ack_deadline_seconds
queue.message_retention_seconds = data.message_retention_seconds
queue.message_filters = data.message_filters
Expand Down Expand Up @@ -153,6 +161,7 @@ def list(
def delete(cls, id: str, session: Session) -> None:
cls.get(id, session=session)
session.query(Message).filter_by(queue_id=id).delete()
session.query(Queue).filter_by(dead_queue_id=id).update({"dead_queue_id": None})
session.query(Queue).filter_by(id=id).delete()
session.commit()

Expand All @@ -176,20 +185,47 @@ def stats(cls, id: str, session: Session) -> QueueStatsSchema:
)

@classmethod
def cleanup(cls, id: str, session: Session) -> None:
queue = cls.get(id=id, session=session)
def _cleanup_expired_messages(cls, queue: QueueSchema, session: Session) -> None:
now = datetime.utcnow()

expired_at_filter = [Message.queue_id == queue.id, Message.expired_at <= now]
session.query(Message).filter(*expired_at_filter).delete()

if queue.message_max_deliveries is not None:
@classmethod
def _cleanup_delivery_attempts_exceeded_messages(cls, queue: QueueSchema, session: Session) -> None:
if queue.message_max_deliveries is not None and queue.dead_queue_id is None:
delivery_attempts_filter = [
Message.queue_id == queue.id,
Message.delivery_attempts >= queue.message_max_deliveries,
]
session.query(Message).filter(*delivery_attempts_filter).delete()

@classmethod
def _cleanup_move_messages_to_dead_queue(cls, queue: QueueSchema, session: Session) -> None:
if queue.message_max_deliveries is not None and queue.dead_queue_id is not None:
dead_queue = cls.get(id=queue.dead_queue_id, session=session)
delivery_attempts_filter = [
Message.queue_id == queue.id,
Message.delivery_attempts >= queue.message_max_deliveries,
]
now = datetime.utcnow()
update_data = {
"queue_id": queue.dead_queue_id,
"delivery_attempts": 0,
"expired_at": now + timedelta(seconds=dead_queue.message_retention_seconds),
"scheduled_at": now,
"updated_at": now,
}
session.query(Message).filter(*delivery_attempts_filter).update(update_data)

@classmethod
def cleanup(cls, id: str, session: Session) -> None:
queue = cls.get(id=id, session=session)

cls._cleanup_expired_messages(queue=queue, session=session)
cls._cleanup_delivery_attempts_exceeded_messages(queue=queue, session=session)
cls._cleanup_move_messages_to_dead_queue(queue=queue, session=session)

session.commit()


Expand Down
1 change: 1 addition & 0 deletions tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Meta:
model = Queue

id = factory.Sequence(lambda n: "queue_%s" % n)
dead_queue_id = None
ack_deadline_seconds = default_ack_deadline_seconds
message_retention_seconds = default_message_retention_seconds
message_max_deliveries = None
Expand Down
39 changes: 39 additions & 0 deletions tests/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ def test_queue_service_delete(session, queue):
assert session.query(Message).filter_by(queue_id=queue.id).count() == 0


def test_queue_service_delete_dead_queue(session, queue):
dead_queue = QueueFactory()
session.add(dead_queue)
session.commit()
queue.dead_queue_id = dead_queue.id
session.commit()

assert QueueService.delete(dead_queue.id, session=session) is None

session.refresh(queue)
assert queue.dead_queue_id is None


def test_queue_service_delete_not_found(session):
with pytest.raises(NotFoundError):
QueueService.delete("invalid-queue-name", session=session)
Expand Down Expand Up @@ -201,6 +214,32 @@ def test_queue_service_cleanup_delivery_attempts(session, queue):
assert session.query(Message).filter_by(queue_id=queue.id).first() == message1


def test_queue_service_cleanup_move_to_dead_queue(session, queue):
dead_queue = QueueFactory()
session.add(dead_queue)
session.commit()
queue.message_max_deliveries = 2
queue.dead_queue_id = dead_queue.id
message1 = MessageFactory(queue_id=queue.id, delivery_attempts=1)
message2 = MessageFactory(queue_id=queue.id, delivery_attempts=2)
message3 = MessageFactory(queue_id=queue.id, delivery_attempts=3)
session.add(message1)
session.add(message2)
session.add(message3)
session.commit()
assert session.query(Message).filter_by(queue_id=queue.id).count() == 3

assert QueueService.cleanup(id=queue.id, session=session) is None
assert session.query(Message).filter_by(queue_id=queue.id).count() == 1
assert session.query(Message).filter_by(queue_id=queue.id).first() == message1

assert session.query(Message).filter_by(queue_id=dead_queue.id).count() == 2
session.refresh(message2)
session.refresh(message3)
assert message2.delivery_attempts == 0
assert message3.delivery_attempts == 0


@pytest.mark.parametrize(
"queue_filters,message_attributes,expected",
[
Expand Down