Skip to content

Commit

Permalink
🩺 add celery healthcheck
Browse files Browse the repository at this point in the history
  • Loading branch information
sjoerdie committed Jul 26, 2024
1 parent 3352a74 commit 9481c2b
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ COPY ./bin/celery_worker.sh /celery_worker.sh
COPY ./bin/celery_flower.sh /celery_flower.sh
COPY ./bin/check_celery_worker_liveness.py ./bin/
COPY ./bin/setup_configuration.sh /setup_configuration.sh
RUN mkdir /app/log /app/config
RUN mkdir /app/log /app/config /app/tmp

# copy frontend build statics
COPY --from=frontend-build /app/src/objects/static /app/src/objects/static
Expand Down
6 changes: 4 additions & 2 deletions bin/celery_worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ if [[ "$ENABLE_COVERAGE" ]]; then
fi

echo "Starting celery worker $WORKER_NAME with queue $QUEUE"
exec $_binary --workdir src --app objects.celery worker \
exec $_binary --workdir src --app "objects.celery" worker \
-Q $QUEUE \
-n $WORKER_NAME \
-l $LOGLEVEL \
-O fair \
-c $CONCURRENCY
-c $CONCURRENCY \
-E \
--max-tasks-per-child=50
25 changes: 23 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ services:
- DEMO_TOKEN=demo-random-string
- DEMO_PERSON=Demo
- [email protected]
healthcheck:
test: ["CMD", "python", "-c", "import requests; exit(requests.head('http://localhost:8000/admin/').status_code not in [200, 302])"]
interval: 30s
timeout: 5s
retries: 3
# This should allow for enough time for migrations to run before the max
# retries have passed. This healthcheck in turn allows other containers
# to wait for the database migrations.
start_period: 30s
ports:
- 8000:8000
depends_on:
Expand All @@ -59,9 +68,21 @@ services:
build: *web_build
environment: *web_env
command: /celery_worker.sh
healthcheck:
test: ["CMD", "python", "/app/bin/check_celery_worker_liveness.py"]
interval: 30s
timeout: 5s
retries: 3
start_period: 10s
depends_on:
- db
- redis
web:
# This health check condition is needed because Celery Beat will
# try to convert the CELERY_BEAT_SCHEDULE into database entries. For
# this, migrations need to be finished. If Celery tasks were still
# pending, the database also needs to be ready for Celery itself. We
# therefore have the health check here, and make Celery beat and
# monitor containers depend on the celery container.
condition: service_healthy
volumes: *web_volumes

celery-flower:
Expand Down
65 changes: 64 additions & 1 deletion src/objects/celery.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,72 @@
from celery import Celery
from pathlib import Path

from django.conf import settings

from celery import Celery, bootsteps
from celery.signals import setup_logging, worker_ready, worker_shutdown

from .setup import setup_env

setup_env()

app = Celery("objects")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.conf.ONCE = {
"backend": "celery_once.backends.Redis",
"settings": {
"url": settings.CELERY_BROKER_URL,
"default_timeout": 60 * 60, # one hour
},
}

app.autodiscover_tasks()


# Use django's logging settings as these are reset by Celery by default
@setup_logging.connect()
def config_loggers(*args, **kwargs):
from logging.config import dictConfig

dictConfig(settings.LOGGING)


HEARTBEAT_FILE = Path(settings.BASE_DIR) / "tmp" / "celery_worker_heartbeat"
READINESS_FILE = Path(settings.BASE_DIR) / "tmp" / "celery_worker_ready"


#
# Utilities for checking the health of celery workers
#
class LivenessProbe(bootsteps.StartStopStep):
requires = {"celery.worker.components:Timer"}

def __init__(self, worker, **kwargs):
self.requests = []
self.tref = None

def start(self, worker):
self.tref = worker.timer.call_repeatedly(
10.0,
self.update_heartbeat_file,
(worker,),
priority=10,
)

def stop(self, worker):
HEARTBEAT_FILE.unlink(missing_ok=True)

def update_heartbeat_file(self, worker):
HEARTBEAT_FILE.touch()


@worker_ready.connect
def worker_ready(**_):
READINESS_FILE.touch()


@worker_shutdown.connect
def worker_shutdown(**_):
READINESS_FILE.unlink(missing_ok=True)


app.steps["worker"].add(LivenessProbe)

0 comments on commit 9481c2b

Please sign in to comment.