Skip to content

Commit

Permalink
feat: add MessageService (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
allisson authored Sep 5, 2022
1 parent 29c9768 commit bf328bf
Show file tree
Hide file tree
Showing 6 changed files with 385 additions and 51 deletions.
10 changes: 6 additions & 4 deletions alembic/versions/001_initial.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Auto generated
Revision ID: 38d1359d1e34
Revision ID: 95c37e429139
Revises:
Create Date: 2022-09-03 20:31:52.937422
Create Date: 2022-09-05 08:04:45.333771
"""
import sqlalchemy as sa
Expand All @@ -11,7 +11,7 @@
from alembic import op

# revision identifiers, used by Alembic.
revision = "38d1359d1e34"
revision = "95c37e429139"
down_revision = None
branch_labels = None
depends_on = None
Expand All @@ -28,7 +28,7 @@ def upgrade() -> None:
op.create_table(
"queues",
sa.Column("id", sa.String(length=128), nullable=False),
sa.Column("topic_id", sa.String(length=128), nullable=False),
sa.Column("topic_id", sa.String(length=128), nullable=True),
sa.Column("ack_deadline_seconds", sa.Integer(), nullable=False),
sa.Column("message_retention_seconds", sa.Integer(), nullable=False),
sa.Column("message_filters", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
Expand All @@ -53,6 +53,8 @@ def upgrade() -> None:
sa.Column("delivery_attempts", sa.Integer(), nullable=False),
sa.Column("expired_at", sa.DateTime(), nullable=False),
sa.Column("scheduled_at", sa.DateTime(), nullable=False),
sa.Column("acked", sa.Boolean(), nullable=False),
sa.Column("dead", sa.Boolean(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
Expand Down
4 changes: 3 additions & 1 deletion fastqueue/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Queue(Base):

id = sqlalchemy.Column(sqlalchemy.String(length=128), primary_key=True, nullable=False)
topic_id = sqlalchemy.Column(
sqlalchemy.String(length=128), sqlalchemy.ForeignKey("topics.id"), index=True, nullable=False
sqlalchemy.String(length=128), sqlalchemy.ForeignKey("topics.id"), index=True, nullable=True
)
ack_deadline_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
message_retention_seconds = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
Expand Down Expand Up @@ -53,6 +53,8 @@ class Message(Base):
delivery_attempts = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
expired_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False)
scheduled_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False)
acked = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False)
dead = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False)
created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False)
updated_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False)

Expand Down
61 changes: 48 additions & 13 deletions fastqueue/schemas.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from typing import Any
from uuid import UUID

from pydantic import BaseModel as Schema
Expand All @@ -12,54 +13,88 @@ class CreateTopicSchema(Schema):


class TopicSchema(Schema):
id: str = Field(..., regex=regex_for_id, max_length=128)
id: str
created_at: datetime

class Config:
orm_mode = True


class ListTopicSchema(Schema):
data: list[TopicSchema]
offset: int | None
limit: int | None


class CreateQueueSchema(Schema):
id: str = Field(..., regex=regex_for_id, max_length=128)
topic_id: str = Field(..., regex=regex_for_id, max_length=128)
ack_deadline_seconds: int
message_retention_seconds: int
message_filters: dict | None = None
topic_id: str | None = Field(None, regex=regex_for_id, max_length=128)
ack_deadline_seconds: int = Field(30, ge=0, le=600)
message_retention_seconds: int = Field(1209600, ge=600, le=1209600)
message_filters: dict[str, list[Any]] | None = None
dead_letter_max_retries: int | None = None
dead_letter_min_backoff_seconds: int | None = None
dead_letter_max_backoff_seconds: int | None = None


class UpdateQueueSchema(Schema):
topic_id: str = Field(..., regex=regex_for_id, max_length=128)
ack_deadline_seconds: int
message_retention_seconds: int
message_filters: dict | None = None
topic_id: str | None = Field(None, regex=regex_for_id, max_length=128)
ack_deadline_seconds: int = Field(..., ge=0, le=600)
message_retention_seconds: int = Field(..., ge=600, le=1209600)
message_filters: dict[str, list[Any]] | None = None
dead_letter_max_retries: int | None = None
dead_letter_min_backoff_seconds: int | None = None
dead_letter_max_backoff_seconds: int | None = None


class QueueSchema(Schema):
id: str = Field(..., regex=regex_for_id, max_length=128)
topic_id: str = Field(..., regex=regex_for_id, max_length=128)
id: str
topic_id: str | None
ack_deadline_seconds: int
message_retention_seconds: int
message_filters: dict | None = None
message_filters: dict[str, list[Any]] | None = None
dead_letter_max_retries: int | None = None
dead_letter_min_backoff_seconds: int | None = None
dead_letter_max_backoff_seconds: int | None = None
created_at: datetime
updated_at: datetime

class Config:
orm_mode = True


class ListQueueSchema(Schema):
data: list[QueueSchema]
offset: int | None
limit: int | None


class CreateMessageSchema(Schema):
data: dict
attributes: dict | None = None


class UpdateMessageSchema(Schema):
delivery_attempts: int
scheduled_at: datetime
acked: bool
dead: bool


class MessageSchema(Schema):
id: UUID
queue_id: str = Field(..., regex=regex_for_id, max_length=128)
queue_id: str
data: dict
attributes: dict | None = None
delivery_attempts: int
created_at: datetime
updated_at: datetime

class Config:
orm_mode = True


class ListMessageSchema(Schema):
data: list[MessageSchema]
offset: int | None
limit: int | None
173 changes: 144 additions & 29 deletions fastqueue/services.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,63 @@
from datetime import datetime
import uuid
from datetime import datetime, timedelta
from typing import Any

from sqlalchemy.orm import Session

from fastqueue.exceptions import NotFoundError
from fastqueue.models import Queue, Topic
from fastqueue.models import Message, Queue, Topic
from fastqueue.schemas import (
CreateMessageSchema,
CreateQueueSchema,
CreateTopicSchema,
ListMessageSchema,
ListQueueSchema,
ListTopicSchema,
MessageSchema,
QueueSchema,
TopicSchema,
UpdateMessageSchema,
UpdateQueueSchema,
)


def apply_basic_filters(query: Any, filters: dict | None, offset: int | None, limit: int | None) -> Any:
if filters is not None:
query = query.filter_by(**filters)
if offset is not None:
query = query.offset(offset)
if limit is not None:
query = query.limit(limit)
return query


class TopicService:
@classmethod
def create(cls, data: CreateTopicSchema, session: Session) -> TopicSchema:
topic = Topic(id=data.id, created_at=datetime.utcnow())
session.add(topic)
return TopicSchema(id=topic.id, created_at=topic.created_at)
return TopicSchema.from_orm(topic)

@classmethod
def get_model(cls, id: str, session: Session) -> Topic | None:
return session.query(Topic).filter_by(id=id).first()

@classmethod
def get(cls, id: str, session: Session) -> TopicSchema:
topic = session.query(Topic).filter_by(id=id).first()
topic = cls.get_model(id=id, session=session)
if topic is None:
raise NotFoundError("topic not found")
return TopicSchema(id=topic.id, created_at=topic.created_at)
return TopicSchema.from_orm(topic)

@classmethod
def list(
cls, filters: dict | None, offset: int | None, limit: int | None, session: Session
) -> ListTopicSchema:
topics = session.query(Topic).order_by(Topic.id)
topics = apply_basic_filters(topics, filters, offset, limit)
return ListTopicSchema(
data=[TopicSchema.from_orm(topic) for topic in topics.all()], offset=offset, limit=limit
)

@classmethod
def delete(cls, id: str, session: Session) -> None:
Expand All @@ -34,28 +66,15 @@ def delete(cls, id: str, session: Session) -> None:


class QueueService:
@classmethod
def to_schema(cls, queue: Queue) -> QueueSchema:
return QueueSchema(
id=queue.id,
topic_id=queue.topic_id,
ack_deadline_seconds=queue.ack_deadline_seconds,
message_retention_seconds=queue.message_retention_seconds,
message_filters=queue.message_filters,
dead_letter_max_retries=queue.dead_letter_max_retries,
dead_letter_min_backoff_seconds=queue.dead_letter_min_backoff_seconds,
dead_letter_max_backoff_seconds=queue.dead_letter_max_backoff_seconds,
created_at=queue.created_at,
updated_at=queue.updated_at,
)

@classmethod
def create(cls, data: CreateQueueSchema, session: Session) -> QueueSchema:
topic = TopicService.get(data.topic_id, session=session)
if data.topic_id is not None:
TopicService.get(data.topic_id, session=session)

now = datetime.utcnow()
queue = Queue(
id=data.id,
topic_id=topic.id,
topic_id=data.topic_id,
ack_deadline_seconds=data.ack_deadline_seconds,
message_retention_seconds=data.message_retention_seconds,
message_filters=data.message_filters,
Expand All @@ -66,13 +85,15 @@ def create(cls, data: CreateQueueSchema, session: Session) -> QueueSchema:
updated_at=now,
)
session.add(queue)
return cls.to_schema(queue)
return QueueSchema.from_orm(queue)

@classmethod
def update(cls, id: str, data: UpdateQueueSchema, session: Session) -> QueueSchema:
topic = TopicService.get(data.topic_id, session=session)
queue = cls.get(id, session=session)
queue.topic_id = topic.id
if data.topic_id is not None:
TopicService.get(data.topic_id, session=session)

queue = cls.get_model(id, session=session)
queue.topic_id = data.topic_id
queue.ack_deadline_seconds = data.ack_deadline_seconds
queue.message_retention_seconds = data.message_retention_seconds
queue.message_filters = data.message_filters
Expand All @@ -81,16 +102,110 @@ def update(cls, id: str, data: UpdateQueueSchema, session: Session) -> QueueSche
queue.dead_letter_max_backoff_seconds = data.dead_letter_max_backoff_seconds
queue.created_at = queue.created_at
queue.updated_at = datetime.utcnow()
return cls.to_schema(queue)
return QueueSchema.from_orm(queue)

@classmethod
def get_model(cls, id: str, session: Session) -> Queue:
return session.query(Queue).filter_by(id=id).first()

@classmethod
def get(cls, id: str, session: Session) -> QueueSchema:
queue = session.query(Queue).filter_by(id=id).first()
queue = cls.get_model(id=id, session=session)
if queue is None:
raise NotFoundError("queue not found")
return cls.to_schema(queue)
return QueueSchema.from_orm(queue)

@classmethod
def list(
cls, filters: dict | None, offset: int | None, limit: int | None, session: Session
) -> ListQueueSchema:
queues = session.query(Queue).order_by(Queue.id)
queues = apply_basic_filters(queues, filters, offset, limit)
return ListQueueSchema(
data=[QueueSchema.from_orm(queue) for queue in queues.all()], offset=offset, limit=limit
)

@classmethod
def delete(cls, id: str, session: Session) -> None:
cls.get(id, session=session)
session.query(Queue).filter_by(id=id).delete()


class MessageService:
@classmethod
def _should_message_be_created_on_queue(
cls, queue_filters: dict[str, list[Any]] | None, message_attributes: dict | None
) -> bool:
if queue_filters is not None:
if message_attributes is None:
return False

keys = set(queue_filters.keys()).intersection(set(message_attributes.keys()))
if len(keys) != len(queue_filters.keys()):
return False

for key in keys:
if message_attributes[key] not in queue_filters[key]:
return False

return True

@classmethod
def create(cls, topic_id: str, data: CreateMessageSchema, session: Session) -> ListMessageSchema:
result = ListMessageSchema(data=[], offset=None, limit=None)
topic = TopicService.get(topic_id, session=session)
queues = QueueService.list(filters={"topic_id": topic.id}, offset=None, limit=None, session=session)
if not queues.data:
return result

for queue in queues.data:
if cls._should_message_be_created_on_queue(queue.message_filters, data.attributes) is False:
continue

now = datetime.utcnow()
message = Message(
id=uuid.uuid4().hex,
queue_id=queue.id,
data=data.data,
attributes=data.attributes,
delivery_attempts=0,
expired_at=now + timedelta(seconds=queue.message_retention_seconds),
scheduled_at=now,
created_at=now,
updated_at=now,
)
session.add(message)
result.data.append(MessageSchema.from_orm(message))

return result

@classmethod
def update(cls, id: str, data: UpdateMessageSchema, session: Session) -> MessageSchema:
message = cls.get_model(id, session=session)
message.delivery_attempts = data.delivery_attempts
message.scheduled_at = data.scheduled_at
message.acked = data.acked
message.dead = data.dead
message.updated_at = datetime.utcnow()
return MessageSchema.from_orm(message)

@classmethod
def get_model(cls, id: str, session: Session) -> Message:
return session.query(Message).filter_by(id=id).first()

@classmethod
def get(cls, id: str, session: Session) -> MessageSchema:
message = cls.get_model(id=id, session=session)
if message is None:
raise NotFoundError("message not found")
return MessageSchema.from_orm(message)

@classmethod
def list(
cls, filters: dict | None, offset: int | None, limit: int | None, session: Session
) -> ListMessageSchema:
messages = session.query(Message).order_by(Message.created_at)
messages = apply_basic_filters(messages, filters, offset, limit)
return ListMessageSchema(
data=[MessageSchema.from_orm(message) for message in messages.all()], offset=offset, limit=limit
)
Loading

0 comments on commit bf328bf

Please sign in to comment.