Skip to content

Commit

Permalink
[#2322] Added single-run enforcement the task, added test
Browse files Browse the repository at this point in the history
  • Loading branch information
Bart van der Schoor committed May 13, 2024
1 parent 317f7f6 commit e87ed3e
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 4 deletions.
1 change: 1 addition & 0 deletions requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ django-sessionprofile
celery ~= 5.0
celery-once
django-celery-beat
-e git+https://github.com/maykinmedia/django-celery-monitor@513dc28#egg=django_celery_monitor

# Common Ground integration
zgw-consumers
Expand Down
4 changes: 3 additions & 1 deletion src/open_inwoner/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .celery import app as celery_app # noqa
from .celery import app as celery_app

__all__ = ("celery_app",)
3 changes: 2 additions & 1 deletion src/open_inwoner/conf/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@
"mailer",
"log_outgoing_requests",
"formtools",
"django_celery_beat",
"django_setup_configuration",
# Project applications.
"open_inwoner.components",
Expand Down Expand Up @@ -236,6 +235,8 @@
"open_inwoner.cms.plugins",
"open_inwoner.cms.benefits",
"djchoices",
"django_celery_beat",
"django_celery_monitor",
]

MIDDLEWARE = [
Expand Down
16 changes: 16 additions & 0 deletions src/open_inwoner/openzaak/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import logging

from django.core.management import call_command

from open_inwoner.celery import QueueOnce, app

logger = logging.getLogger(__name__)


@app.task(base=QueueOnce, once={"keys": []})
def import_zgw_data():
logger.info(f"starting import_zgw_data() task")

call_command("zgw_import_data")

logger.info(f"finished import_zgw_data() task")
52 changes: 52 additions & 0 deletions src/open_inwoner/openzaak/tests/test_zgw_imports_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import uuid
from unittest.mock import Mock, patch

from django.test import TestCase

from celery_once.tasks import AlreadyQueued

from open_inwoner.celery import app as celery_app
from open_inwoner.openzaak.tasks import import_zgw_data
from open_inwoner.utils.test import ClearCachesMixin


class ZGWImportTest(ClearCachesMixin, TestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
# manually patch the conf: it is a dynamic object that is hard to patch
cls._old_eager = celery_app.conf.task_always_eager
celery_app.conf.task_always_eager = False

@classmethod
def tearDownClass(cls):
super().tearDownClass()
celery_app.conf.task_always_eager = cls._old_eager

@patch("open_inwoner.openzaak.tasks.call_command")
def test_task_calls_command(self, mock_call: Mock):
import_zgw_data()
mock_call.assert_called_once_with("zgw_import_data")

@patch("open_inwoner.celery.app.send_task")
def test_task_runs_once(self, mock_send: Mock):
# manually patch the task because it is not a normal class/method
key = str(uuid.uuid4())

def get_key(self, args=None, kwargs=None):
return key

import_zgw_data.get_key = get_key

# add manual cleanup so we don't litter locks
def cleanup():
import_zgw_data.once_backend.clear_lock(key)

self.addCleanup(cleanup)

# actual test
import_zgw_data.apply_async()
with self.assertRaises(AlreadyQueued):
import_zgw_data.apply_async()

mock_send.assert_called_once()
7 changes: 6 additions & 1 deletion src/open_inwoner/utils/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,19 @@ def run_task(self, request, task_id):
task_kwargs = json.loads(periodic_task.kwargs)
task_args = json.loads(periodic_task.args)

# NOTE send_task() doesn't work with Celery_Once, use .delay() or .apply_async()
# app.send_task(periodic_task.task, args=task_args, kwargs=task_kwargs)
if task := app.tasks.get(periodic_task.task):

task = app.tasks.get(periodic_task.task)
if task:
try:
task.apply_async(args=task_args, kwargs=task_kwargs)
except AlreadyQueued:
messages.warning(request, _("De taak wordt al uitgevoerd."))
else:
messages.success(request, _("De taak wordt uitgevoerd."))
# we could redirect but the 'celery_monitor' view takes a few seconds to show the task, and doesnt auto-refresh
# return redirect(reverse("admin:celery_monitor_taskstate_changelist"))
else:
messages.warning(
request, _("Er is een probleem met het starten van de taak.")
Expand Down
9 changes: 9 additions & 0 deletions src/open_inwoner/utils/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,14 @@ class UtilsConfig(AppConfig):
name = "open_inwoner.utils"

def ready(self):
from django.apps import registry

# force the task autodiscovery
from ..celery import app
from . import checks # noqa
from .signals import copy_log_entry_to_timeline_logger # noqa

installed_apps = [
app_config.name for app_config in registry.apps.app_configs.values()
]
app.autodiscover_tasks(installed_apps, force=True)
2 changes: 1 addition & 1 deletion src/start_celery.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

. ../env/bin/activate

#exec celery -A "open_inwoner" events -l info --camera django_celery_monitor.camera.Camera --frequency=2.0 &
exec celery -A "open_inwoner" events -l info --camera django_celery_monitor.camera.Camera --frequency=2.0 &
exec celery -A "open_inwoner" worker -E --max-tasks-per-child=50 -l info -B --scheduler django_celery_beat.schedulers:DatabaseScheduler

0 comments on commit e87ed3e

Please sign in to comment.