Skip to content

Commit

Permalink
feat: add QueueService (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
allisson authored Sep 3, 2022
1 parent 08ccad7 commit 29c9768
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 36 deletions.
11 changes: 3 additions & 8 deletions alembic/versions/001_initial.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Auto generated
Revision ID: ed7f6025b378
Revision ID: 38d1359d1e34
Revises:
Create Date: 2022-09-02 19:37:18.428504
Create Date: 2022-09-03 20:31:52.937422
"""
import sqlalchemy as sa
Expand All @@ -11,7 +11,7 @@
from alembic import op

# revision identifiers, used by Alembic.
revision = "ed7f6025b378"
revision = "38d1359d1e34"
down_revision = None
branch_labels = None
depends_on = None
Expand All @@ -32,16 +32,11 @@ def upgrade() -> None:
sa.Column("ack_deadline_seconds", sa.Integer(), nullable=False),
sa.Column("message_retention_seconds", sa.Integer(), nullable=False),
sa.Column("message_filters", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column("dead_letter_queue_id", sa.String(length=128), nullable=True),
sa.Column("dead_letter_max_retries", sa.Integer(), nullable=True),
sa.Column("dead_letter_min_backoff_seconds", sa.Integer(), nullable=True),
sa.Column("dead_letter_max_backoff_seconds", sa.Integer(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["dead_letter_queue_id"],
["queues.id"],
),
sa.ForeignKeyConstraint(
["topic_id"],
["topics.id"],
Expand Down
3 changes: 0 additions & 3 deletions fastqueue/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ class Queue(Base):
ack_deadline_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
message_retention_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
message_filters = sqlalchemy.Column(postgresql.JSONB, nullable=True)
dead_letter_queue_id = sqlalchemy.Column(
sqlalchemy.String(length=128), sqlalchemy.ForeignKey("queues.id"), nullable=True
)
dead_letter_max_retries = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
dead_letter_min_backoff_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
dead_letter_max_backoff_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
Expand Down
3 changes: 0 additions & 3 deletions fastqueue/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class CreateQueueSchema(Schema):
ack_deadline_seconds: int
message_retention_seconds: int
message_filters: dict | None = None
dead_letter_queue_id: str | None = Field(None, regex=regex_for_id, max_length=128)
dead_letter_max_retries: int | None = None
dead_letter_min_backoff_seconds: int | None = None
dead_letter_max_backoff_seconds: int | None = None
Expand All @@ -33,7 +32,6 @@ class UpdateQueueSchema(Schema):
ack_deadline_seconds: int
message_retention_seconds: int
message_filters: dict | None = None
dead_letter_queue_id: str | None = Field(None, regex=regex_for_id, max_length=128)
dead_letter_max_retries: int | None = None
dead_letter_min_backoff_seconds: int | None = None
dead_letter_max_backoff_seconds: int | None = None
Expand All @@ -45,7 +43,6 @@ class QueueSchema(Schema):
ack_deadline_seconds: int
message_retention_seconds: int
message_filters: dict | None = None
dead_letter_queue_id: str | None = Field(None, regex=regex_for_id, max_length=128)
dead_letter_max_retries: int | None = None
dead_letter_min_backoff_seconds: int | None = None
dead_letter_max_backoff_seconds: int | None = None
Expand Down
84 changes: 78 additions & 6 deletions fastqueue/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,94 @@
from sqlalchemy.orm import Session

from fastqueue.exceptions import NotFoundError
from fastqueue.models import Topic
from fastqueue.schemas import CreateTopicSchema, TopicSchema
from fastqueue.models import Queue, Topic
from fastqueue.schemas import (
CreateQueueSchema,
CreateTopicSchema,
QueueSchema,
TopicSchema,
UpdateQueueSchema,
)


class TopicService:
def create(self, data: CreateTopicSchema, session: Session) -> TopicSchema:
@classmethod
def create(cls, data: CreateTopicSchema, session: Session) -> TopicSchema:
topic = Topic(id=data.id, created_at=datetime.utcnow())
session.add(topic)
return TopicSchema(id=topic.id, created_at=topic.created_at)

def get(self, id: str, session: Session) -> TopicSchema:
@classmethod
def get(cls, id: str, session: Session) -> TopicSchema:
topic = session.query(Topic).filter_by(id=id).first()
if topic is None:
raise NotFoundError("topic not found")
return TopicSchema(id=topic.id, created_at=topic.created_at)

def delete(self, id: str, session: Session) -> None:
self.get(id, session=session)
@classmethod
def delete(cls, id: str, session: Session) -> None:
cls.get(id, session=session)
session.query(Topic).filter_by(id=id).delete()


class QueueService:
@classmethod
def to_schema(cls, queue: Queue) -> QueueSchema:
return QueueSchema(
id=queue.id,
topic_id=queue.topic_id,
ack_deadline_seconds=queue.ack_deadline_seconds,
message_retention_seconds=queue.message_retention_seconds,
message_filters=queue.message_filters,
dead_letter_max_retries=queue.dead_letter_max_retries,
dead_letter_min_backoff_seconds=queue.dead_letter_min_backoff_seconds,
dead_letter_max_backoff_seconds=queue.dead_letter_max_backoff_seconds,
created_at=queue.created_at,
updated_at=queue.updated_at,
)

@classmethod
def create(cls, data: CreateQueueSchema, session: Session) -> QueueSchema:
topic = TopicService.get(data.topic_id, session=session)
now = datetime.utcnow()
queue = Queue(
id=data.id,
topic_id=topic.id,
ack_deadline_seconds=data.ack_deadline_seconds,
message_retention_seconds=data.message_retention_seconds,
message_filters=data.message_filters,
dead_letter_max_retries=data.dead_letter_max_retries,
dead_letter_min_backoff_seconds=data.dead_letter_min_backoff_seconds,
dead_letter_max_backoff_seconds=data.dead_letter_max_backoff_seconds,
created_at=now,
updated_at=now,
)
session.add(queue)
return cls.to_schema(queue)

@classmethod
def update(cls, id: str, data: UpdateQueueSchema, session: Session) -> QueueSchema:
topic = TopicService.get(data.topic_id, session=session)
queue = cls.get(id, session=session)
queue.topic_id = topic.id
queue.ack_deadline_seconds = data.ack_deadline_seconds
queue.message_retention_seconds = data.message_retention_seconds
queue.message_filters = data.message_filters
queue.dead_letter_max_retries = data.dead_letter_max_retries
queue.dead_letter_min_backoff_seconds = data.dead_letter_min_backoff_seconds
queue.dead_letter_max_backoff_seconds = data.dead_letter_max_backoff_seconds
queue.created_at = queue.created_at
queue.updated_at = datetime.utcnow()
return cls.to_schema(queue)

@classmethod
def get(cls, id: str, session: Session) -> QueueSchema:
queue = session.query(Queue).filter_by(id=id).first()
if queue is None:
raise NotFoundError("queue not found")
return cls.to_schema(queue)

@classmethod
def delete(cls, id: str, session: Session) -> None:
cls.get(id, session=session)
session.query(Queue).filter_by(id=id).delete()
101 changes: 85 additions & 16 deletions tests/test_services.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,117 @@
import pytest

from fastqueue.exceptions import NotFoundError
from fastqueue.schemas import CreateTopicSchema
from fastqueue.services import TopicService
from fastqueue.schemas import CreateQueueSchema, CreateTopicSchema, UpdateQueueSchema
from fastqueue.services import QueueService, TopicService


def test_topic_service_create(session):
id = "my_topic"
data = CreateTopicSchema(id=id)
service = TopicService()

result = service.create(data, session=session)
result = TopicService.create(data, session=session)

assert result.id == id
assert result.created_at


def test_topic_service_get(session, topic):
service = TopicService()

result = service.get(topic.id, session=session)
result = TopicService.get(topic.id, session=session)

assert result.id == topic.id
assert result.created_at


def test_topic_service_get_not_found(session):
service = TopicService()

with pytest.raises(NotFoundError):
service.get("invalid-topic-name", session=session)
TopicService.get("invalid-topic-name", session=session)


def test_topic_service_delete(session, topic):
service = TopicService()

assert service.delete(topic.id, session=session) is None
assert TopicService.delete(topic.id, session=session) is None

with pytest.raises(NotFoundError):
service.get(topic.id, session=session)
TopicService.get(topic.id, session=session)


def test_topic_service_delete_not_found(session):
service = TopicService()
with pytest.raises(NotFoundError):
TopicService.delete("invalid-topic-name", session=session)


def test_queue_service_create(session, topic):
data = CreateQueueSchema(
id="my-queue",
topic_id=topic.id,
ack_deadline_seconds=30,
message_retention_seconds=604800,
message_filters={"attr1": "attr1"},
dead_letter_max_retries=5,
dead_letter_min_backoff_seconds=1,
dead_letter_max_backoff_seconds=5,
)

result = QueueService.create(data, session=session)

assert result.id == data.id
assert result.topic_id == data.topic_id
assert result.ack_deadline_seconds == data.ack_deadline_seconds
assert result.message_retention_seconds == data.message_retention_seconds
assert result.message_filters == data.message_filters
assert result.dead_letter_max_retries == data.dead_letter_max_retries
assert result.dead_letter_min_backoff_seconds == data.dead_letter_min_backoff_seconds
assert result.dead_letter_max_backoff_seconds == data.dead_letter_max_backoff_seconds
assert result.created_at
assert result.updated_at


def test_queue_service_update(session, queue):
data = UpdateQueueSchema(
topic_id=queue.topic_id,
ack_deadline_seconds=60,
message_retention_seconds=60,
message_filters=None,
dead_letter_max_retries=None,
dead_letter_min_backoff_seconds=None,
dead_letter_max_backoff_seconds=None,
)

result = QueueService.update(queue.id, data, session=session)

assert result.id == queue.id
assert result.topic_id == data.topic_id
assert result.ack_deadline_seconds == data.ack_deadline_seconds
assert result.message_retention_seconds == data.message_retention_seconds
assert result.message_filters == data.message_filters
assert result.dead_letter_max_retries == data.dead_letter_max_retries
assert result.dead_letter_min_backoff_seconds == data.dead_letter_min_backoff_seconds
assert result.dead_letter_max_backoff_seconds == data.dead_letter_max_backoff_seconds
assert result.created_at
assert result.updated_at


def test_queue_service_get(session, queue):
result = QueueService.get(queue.id, session=session)

assert result.id == queue.id
assert result.topic_id == queue.topic_id
assert result.ack_deadline_seconds == queue.ack_deadline_seconds
assert result.message_retention_seconds == queue.message_retention_seconds
assert result.message_filters == queue.message_filters
assert result.dead_letter_max_retries == queue.dead_letter_max_retries
assert result.dead_letter_min_backoff_seconds == queue.dead_letter_min_backoff_seconds
assert result.dead_letter_max_backoff_seconds == queue.dead_letter_max_backoff_seconds
assert result.created_at
assert result.updated_at


def test_queue_service_delete(session, queue):
assert QueueService.delete(queue.id, session=session) is None

with pytest.raises(NotFoundError):
QueueService.get(queue.id, session=session)


def test_queue_service_delete_not_found(session):
with pytest.raises(NotFoundError):
service.delete("invalid-topic-name", session=session)
QueueService.delete("invalid-queue-name", session=session)

0 comments on commit 29c9768

Please sign in to comment.