diff --git a/alembic/versions/001_initial.py b/alembic/versions/001_initial.py index a9fb751..0c3d373 100644 --- a/alembic/versions/001_initial.py +++ b/alembic/versions/001_initial.py @@ -1,8 +1,8 @@ """Auto generated -Revision ID: ed7f6025b378 +Revision ID: 38d1359d1e34 Revises: -Create Date: 2022-09-02 19:37:18.428504 +Create Date: 2022-09-03 20:31:52.937422 """ import sqlalchemy as sa @@ -11,7 +11,7 @@ from alembic import op # revision identifiers, used by Alembic. -revision = "ed7f6025b378" +revision = "38d1359d1e34" down_revision = None branch_labels = None depends_on = None @@ -32,16 +32,11 @@ def upgrade() -> None: sa.Column("ack_deadline_seconds", sa.Integer(), nullable=False), sa.Column("message_retention_seconds", sa.Integer(), nullable=False), sa.Column("message_filters", postgresql.JSONB(astext_type=sa.Text()), nullable=True), - sa.Column("dead_letter_queue_id", sa.String(length=128), nullable=True), sa.Column("dead_letter_max_retries", sa.Integer(), nullable=True), sa.Column("dead_letter_min_backoff_seconds", sa.Integer(), nullable=True), sa.Column("dead_letter_max_backoff_seconds", sa.Integer(), nullable=True), sa.Column("created_at", sa.DateTime(), nullable=False), sa.Column("updated_at", sa.DateTime(), nullable=False), - sa.ForeignKeyConstraint( - ["dead_letter_queue_id"], - ["queues.id"], - ), sa.ForeignKeyConstraint( ["topic_id"], ["topics.id"], diff --git a/fastqueue/models.py b/fastqueue/models.py index 5a5fdf0..a0d9203 100644 --- a/fastqueue/models.py +++ b/fastqueue/models.py @@ -24,9 +24,6 @@ class Queue(Base): ack_deadline_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=False) message_retention_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=False) message_filters = sqlalchemy.Column(postgresql.JSONB, nullable=True) - dead_letter_queue_id = sqlalchemy.Column( - sqlalchemy.String(length=128), sqlalchemy.ForeignKey("queues.id"), nullable=True - ) dead_letter_max_retries = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) dead_letter_min_backoff_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) dead_letter_max_backoff_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) diff --git a/fastqueue/schemas.py b/fastqueue/schemas.py index e18d0b6..0b15b8d 100644 --- a/fastqueue/schemas.py +++ b/fastqueue/schemas.py @@ -22,7 +22,6 @@ class CreateQueueSchema(Schema): ack_deadline_seconds: int message_retention_seconds: int message_filters: dict | None = None - dead_letter_queue_id: str | None = Field(None, regex=regex_for_id, max_length=128) dead_letter_max_retries: int | None = None dead_letter_min_backoff_seconds: int | None = None dead_letter_max_backoff_seconds: int | None = None @@ -33,7 +32,6 @@ class UpdateQueueSchema(Schema): ack_deadline_seconds: int message_retention_seconds: int message_filters: dict | None = None - dead_letter_queue_id: str | None = Field(None, regex=regex_for_id, max_length=128) dead_letter_max_retries: int | None = None dead_letter_min_backoff_seconds: int | None = None dead_letter_max_backoff_seconds: int | None = None @@ -45,7 +43,6 @@ class QueueSchema(Schema): ack_deadline_seconds: int message_retention_seconds: int message_filters: dict | None = None - dead_letter_queue_id: str | None = Field(None, regex=regex_for_id, max_length=128) dead_letter_max_retries: int | None = None dead_letter_min_backoff_seconds: int | None = None dead_letter_max_backoff_seconds: int | None = None diff --git a/fastqueue/services.py b/fastqueue/services.py index 4b757ed..87804ec 100644 --- a/fastqueue/services.py +++ b/fastqueue/services.py @@ -3,22 +3,94 @@ from sqlalchemy.orm import Session from fastqueue.exceptions import NotFoundError -from fastqueue.models import Topic -from fastqueue.schemas import CreateTopicSchema, TopicSchema +from fastqueue.models import Queue, Topic +from fastqueue.schemas import ( + CreateQueueSchema, + CreateTopicSchema, + QueueSchema, + TopicSchema, + UpdateQueueSchema, +) class TopicService: - def create(self, data: CreateTopicSchema, session: Session) -> TopicSchema: + @classmethod + def create(cls, data: CreateTopicSchema, session: Session) -> TopicSchema: topic = Topic(id=data.id, created_at=datetime.utcnow()) session.add(topic) return TopicSchema(id=topic.id, created_at=topic.created_at) - def get(self, id: str, session: Session) -> TopicSchema: + @classmethod + def get(cls, id: str, session: Session) -> TopicSchema: topic = session.query(Topic).filter_by(id=id).first() if topic is None: raise NotFoundError("topic not found") return TopicSchema(id=topic.id, created_at=topic.created_at) - def delete(self, id: str, session: Session) -> None: - self.get(id, session=session) + @classmethod + def delete(cls, id: str, session: Session) -> None: + cls.get(id, session=session) session.query(Topic).filter_by(id=id).delete() + + +class QueueService: + @classmethod + def to_schema(cls, queue: Queue) -> QueueSchema: + return QueueSchema( + id=queue.id, + topic_id=queue.topic_id, + ack_deadline_seconds=queue.ack_deadline_seconds, + message_retention_seconds=queue.message_retention_seconds, + message_filters=queue.message_filters, + dead_letter_max_retries=queue.dead_letter_max_retries, + dead_letter_min_backoff_seconds=queue.dead_letter_min_backoff_seconds, + dead_letter_max_backoff_seconds=queue.dead_letter_max_backoff_seconds, + created_at=queue.created_at, + updated_at=queue.updated_at, + ) + + @classmethod + def create(cls, data: CreateQueueSchema, session: Session) -> QueueSchema: + topic = TopicService.get(data.topic_id, session=session) + now = datetime.utcnow() + queue = Queue( + id=data.id, + topic_id=topic.id, + ack_deadline_seconds=data.ack_deadline_seconds, + message_retention_seconds=data.message_retention_seconds, + message_filters=data.message_filters, + dead_letter_max_retries=data.dead_letter_max_retries, + dead_letter_min_backoff_seconds=data.dead_letter_min_backoff_seconds, + dead_letter_max_backoff_seconds=data.dead_letter_max_backoff_seconds, + created_at=now, + updated_at=now, + ) + session.add(queue) + return cls.to_schema(queue) + + @classmethod + def update(cls, id: str, data: UpdateQueueSchema, session: Session) -> QueueSchema: + topic = TopicService.get(data.topic_id, session=session) + queue = cls.get(id, session=session) + queue.topic_id = topic.id + queue.ack_deadline_seconds = data.ack_deadline_seconds + queue.message_retention_seconds = data.message_retention_seconds + queue.message_filters = data.message_filters + queue.dead_letter_max_retries = data.dead_letter_max_retries + queue.dead_letter_min_backoff_seconds = data.dead_letter_min_backoff_seconds + queue.dead_letter_max_backoff_seconds = data.dead_letter_max_backoff_seconds + queue.created_at = queue.created_at + queue.updated_at = datetime.utcnow() + return cls.to_schema(queue) + + @classmethod + def get(cls, id: str, session: Session) -> QueueSchema: + queue = session.query(Queue).filter_by(id=id).first() + if queue is None: + raise NotFoundError("queue not found") + return cls.to_schema(queue) + + @classmethod + def delete(cls, id: str, session: Session) -> None: + cls.get(id, session=session) + session.query(Queue).filter_by(id=id).delete() diff --git a/tests/test_services.py b/tests/test_services.py index 7396b9b..9e0c7ad 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -1,48 +1,117 @@ import pytest from fastqueue.exceptions import NotFoundError -from fastqueue.schemas import CreateTopicSchema -from fastqueue.services import TopicService +from fastqueue.schemas import CreateQueueSchema, CreateTopicSchema, UpdateQueueSchema +from fastqueue.services import QueueService, TopicService def test_topic_service_create(session): id = "my_topic" data = CreateTopicSchema(id=id) - service = TopicService() - result = service.create(data, session=session) + result = TopicService.create(data, session=session) assert result.id == id assert result.created_at def test_topic_service_get(session, topic): - service = TopicService() - - result = service.get(topic.id, session=session) + result = TopicService.get(topic.id, session=session) assert result.id == topic.id assert result.created_at def test_topic_service_get_not_found(session): - service = TopicService() - with pytest.raises(NotFoundError): - service.get("invalid-topic-name", session=session) + TopicService.get("invalid-topic-name", session=session) def test_topic_service_delete(session, topic): - service = TopicService() - - assert service.delete(topic.id, session=session) is None + assert TopicService.delete(topic.id, session=session) is None with pytest.raises(NotFoundError): - service.get(topic.id, session=session) + TopicService.get(topic.id, session=session) def test_topic_service_delete_not_found(session): - service = TopicService() + with pytest.raises(NotFoundError): + TopicService.delete("invalid-topic-name", session=session) + + +def test_queue_service_create(session, topic): + data = CreateQueueSchema( + id="my-queue", + topic_id=topic.id, + ack_deadline_seconds=30, + message_retention_seconds=604800, + message_filters={"attr1": "attr1"}, + dead_letter_max_retries=5, + dead_letter_min_backoff_seconds=1, + dead_letter_max_backoff_seconds=5, + ) + + result = QueueService.create(data, session=session) + + assert result.id == data.id + assert result.topic_id == data.topic_id + assert result.ack_deadline_seconds == data.ack_deadline_seconds + assert result.message_retention_seconds == data.message_retention_seconds + assert result.message_filters == data.message_filters + assert result.dead_letter_max_retries == data.dead_letter_max_retries + assert result.dead_letter_min_backoff_seconds == data.dead_letter_min_backoff_seconds + assert result.dead_letter_max_backoff_seconds == data.dead_letter_max_backoff_seconds + assert result.created_at + assert result.updated_at + + +def test_queue_service_update(session, queue): + data = UpdateQueueSchema( + topic_id=queue.topic_id, + ack_deadline_seconds=60, + message_retention_seconds=60, + message_filters=None, + dead_letter_max_retries=None, + dead_letter_min_backoff_seconds=None, + dead_letter_max_backoff_seconds=None, + ) + + result = QueueService.update(queue.id, data, session=session) + + assert result.id == queue.id + assert result.topic_id == data.topic_id + assert result.ack_deadline_seconds == data.ack_deadline_seconds + assert result.message_retention_seconds == data.message_retention_seconds + assert result.message_filters == data.message_filters + assert result.dead_letter_max_retries == data.dead_letter_max_retries + assert result.dead_letter_min_backoff_seconds == data.dead_letter_min_backoff_seconds + assert result.dead_letter_max_backoff_seconds == data.dead_letter_max_backoff_seconds + assert result.created_at + assert result.updated_at + + +def test_queue_service_get(session, queue): + result = QueueService.get(queue.id, session=session) + + assert result.id == queue.id + assert result.topic_id == queue.topic_id + assert result.ack_deadline_seconds == queue.ack_deadline_seconds + assert result.message_retention_seconds == queue.message_retention_seconds + assert result.message_filters == queue.message_filters + assert result.dead_letter_max_retries == queue.dead_letter_max_retries + assert result.dead_letter_min_backoff_seconds == queue.dead_letter_min_backoff_seconds + assert result.dead_letter_max_backoff_seconds == queue.dead_letter_max_backoff_seconds + assert result.created_at + assert result.updated_at + + +def test_queue_service_delete(session, queue): + assert QueueService.delete(queue.id, session=session) is None + + with pytest.raises(NotFoundError): + QueueService.get(queue.id, session=session) + +def test_queue_service_delete_not_found(session): with pytest.raises(NotFoundError): - service.delete("invalid-topic-name", session=session) + QueueService.delete("invalid-queue-name", session=session)