diff --git a/Dockerfile b/Dockerfile index f82901c9..ba141e4b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -57,7 +57,7 @@ COPY ./bin/celery_worker.sh /celery_worker.sh COPY ./bin/celery_flower.sh /celery_flower.sh COPY ./bin/check_celery_worker_liveness.py ./bin/ COPY ./bin/setup_configuration.sh /setup_configuration.sh -RUN mkdir /app/log /app/config +RUN mkdir /app/log /app/config /app/tmp # copy frontend build statics COPY --from=frontend-build /app/src/objects/static /app/src/objects/static diff --git a/bin/celery_worker.sh b/bin/celery_worker.sh index 33fdf84a..031a2ec1 100755 --- a/bin/celery_worker.sh +++ b/bin/celery_worker.sh @@ -15,9 +15,11 @@ if [[ "$ENABLE_COVERAGE" ]]; then fi echo "Starting celery worker $WORKER_NAME with queue $QUEUE" -exec $_binary --workdir src --app objects.celery worker \ +exec $_binary --workdir src --app "objects.celery" worker \ -Q $QUEUE \ -n $WORKER_NAME \ -l $LOGLEVEL \ -O fair \ - -c $CONCURRENCY + -c $CONCURRENCY \ + -E \ + --max-tasks-per-child=50 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 7ff54493..cf49c46c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,6 +36,15 @@ services: - DEMO_TOKEN=demo-random-string - DEMO_PERSON=Demo - DEMO_EMAIL=demo@demo.local + healthcheck: + test: ["CMD", "python", "-c", "import requests; exit(requests.head('http://localhost:8000/admin/').status_code not in [200, 302])"] + interval: 30s + timeout: 5s + retries: 3 + # This should allow for enough time for migrations to run before the max + # retries have passed. This healthcheck in turn allows other containers + # to wait for the database migrations. + start_period: 30s ports: - 8000:8000 depends_on: @@ -59,9 +68,15 @@ services: build: *web_build environment: *web_env command: /celery_worker.sh + healthcheck: + test: ["CMD", "python", "/app/bin/check_celery_worker_liveness.py"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 10s depends_on: - - db - - redis + web: + condition: service_healthy volumes: *web_volumes celery-flower: diff --git a/src/objects/celery.py b/src/objects/celery.py index f5c5e0e0..5103a886 100644 --- a/src/objects/celery.py +++ b/src/objects/celery.py @@ -1,4 +1,9 @@ -from celery import Celery +from pathlib import Path + +from django.conf import settings + +from celery import Celery, bootsteps +from celery.signals import setup_logging, worker_ready, worker_shutdown from .setup import setup_env @@ -6,4 +11,62 @@ app = Celery("objects") app.config_from_object("django.conf:settings", namespace="CELERY") +app.conf.ONCE = { + "backend": "celery_once.backends.Redis", + "settings": { + "url": settings.CELERY_BROKER_URL, + "default_timeout": 60 * 60, # one hour + }, +} + app.autodiscover_tasks() + + +# Use django's logging settings as these are reset by Celery by default +@setup_logging.connect() +def config_loggers(*args, **kwargs): + from logging.config import dictConfig + + dictConfig(settings.LOGGING) + + +HEARTBEAT_FILE = Path(settings.BASE_DIR) / "tmp" / "celery_worker_heartbeat" +READINESS_FILE = Path(settings.BASE_DIR) / "tmp" / "celery_worker_ready" + + +# +# Utilities for checking the health of celery workers +# +class LivenessProbe(bootsteps.StartStopStep): + requires = {"celery.worker.components:Timer"} + + def __init__(self, worker, **kwargs): + self.requests = [] + self.tref = None + + def start(self, worker): + self.tref = worker.timer.call_repeatedly( + 10.0, + self.update_heartbeat_file, + (worker,), + priority=10, + ) + + def stop(self, worker): + HEARTBEAT_FILE.unlink(missing_ok=True) + + def update_heartbeat_file(self, worker): + HEARTBEAT_FILE.touch() + + +@worker_ready.connect +def worker_ready(**_): + READINESS_FILE.touch() + + +@worker_shutdown.connect +def worker_shutdown(**_): + READINESS_FILE.unlink(missing_ok=True) + + +app.steps["worker"].add(LivenessProbe)