Skip to content

Commit

Permalink
feat: add worker entrypoint (#17)
Browse files Browse the repository at this point in the history
* feat: add worker entrypoint

* feat: add run_worker function
  • Loading branch information
allisson authored Sep 9, 2022
1 parent f15f30a commit 6627950
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 24 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ run-server:
run-db-migrate:
poetry run python fastqueue/main.py db-migrate

run-worker:
poetry run python fastqueue/main.py worker

create-auto-migration:
poetry run alembic revision --autogenerate -m "Auto generated"

.PHONY: test lint run-db rm-db run-test-db rm-test-db build-image run-server run-db-migrate create-auto-migration
.PHONY: test lint run-db rm-db run-test-db rm-test-db build-image run-server run-db-migrate run-worker create-auto-migration
2 changes: 1 addition & 1 deletion env.sample
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
fastqueue_log_formatter='asctime=%(asctime)s level=%(levelname)s pathname=%(pathname)s line=%(lineno)s message=%(message)s'
fastqueue_log_level='debug'
fastqueue_log_json_format='false'
fastqueue_log_json_format='true'

fastqueue_debug='true'
fastqueue_server_host='127.0.0.1'
Expand Down
1 change: 1 addition & 0 deletions fastqueue/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class Settings(BaseSettings):
max_message_retention_seconds: int = 1209600
min_message_max_deliveries: int = 1
max_message_max_deliveries: int = 1000
queue_cleanup_interval_seconds: int = 60

class Config:
env_file = ".env"
Expand Down
6 changes: 6 additions & 0 deletions fastqueue/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from fastqueue.api import run_server
from fastqueue.database import run_migrations
from fastqueue.workers import run_worker

cli = typer.Typer()

Expand All @@ -16,5 +17,10 @@ def run_migrations_command() -> None:
return run_migrations()


@cli.command("worker")
def run_worker_command() -> None:
return run_worker()


if __name__ == "__main__":
cli()
7 changes: 0 additions & 7 deletions fastqueue/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,6 @@ def create(cls, topic_id: str, data: CreateMessageSchema, session: Session) -> L
session.commit()
return result

@classmethod
def get(cls, id: str, session: Session) -> MessageSchema:
message = get_model(model=Message, filters={"id": id}, session=session)
if message is None:
raise NotFoundError("message not found")
return MessageSchema.from_orm(message)

@classmethod
def list_for_consume(cls, queue_id: str, limit: int, session: Session) -> ListMessageSchema:
queue = QueueService.get(id=queue_id, session=session)
Expand Down
27 changes: 27 additions & 0 deletions fastqueue/workers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from rocketry import Rocketry
from rocketry.conds import every

from fastqueue.config import settings
from fastqueue.database import SessionLocal
from fastqueue.logger import get_logger
from fastqueue.models import Queue
from fastqueue.services import QueueService

logger = get_logger(__name__)
worker = Rocketry()


@worker.task(every(f"{settings.queue_cleanup_interval_seconds} seconds"))
def queue_cleanup_task():
logger.info("starting queue_cleanup task")

with SessionLocal() as session:
for result in session.query(Queue.id):
queue_id = result[0]
QueueService.cleanup(id=queue_id, session=session)

logger.info("finishing queue_cleanup task")


def run_worker():
return worker.run(debug=settings.debug)
46 changes: 43 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ alembic = "^1.8.1"
psycopg2-binary = "^2.9.3"
typer = {extras = ["all"], version = "^0.6.1"}
python-json-logger = "^2.0.4"
rocketry = "^2.3.0"

[tool.poetry.group.dev.dependencies]
pytest = "^7"
Expand Down
12 changes: 0 additions & 12 deletions tests/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,18 +242,6 @@ def test_message_service_create(session, topic):
assert message.attributes == data.attributes


def test_message_service_get(session, message):
result = MessageService.get(id=message.id, session=session)

assert str(result.id) == message.id
assert result.queue_id == message.queue_id
assert result.data == message.data
assert result.attributes == message.attributes
assert result.delivery_attempts == message.delivery_attempts
assert result.created_at
assert result.updated_at


def test_message_service_list_for_consume(session, queue):
queue.ack_deadline_seconds = 1
queue.message_max_deliveries = None
Expand Down
19 changes: 19 additions & 0 deletions tests/test_workers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from datetime import datetime, timedelta

from fastqueue.models import Message
from fastqueue.workers import queue_cleanup_task
from tests.factories import MessageFactory


def test_queue_cleanup(session, queue):
messages = MessageFactory.build_batch(5, queue_id=queue.id)
for message in messages:
message.expired_at = datetime.utcnow() - timedelta(seconds=1)
session.add(message)
session.commit()

assert session.query(Message).filter_by(queue_id=queue.id).count() == 5

assert queue_cleanup_task() is None

assert session.query(Message).filter_by(queue_id=queue.id).count() == 0

0 comments on commit 6627950

Please sign in to comment.