diff --git a/README.md b/README.md index a0c62ad..52e4330 100644 --- a/README.md +++ b/README.md @@ -46,16 +46,17 @@ So for now your options are to either: Features of this package include: -- A custom `DjangoJobStore`: an [APScheduler job store](https://apscheduler.readthedocs.io/en/latest/extending.html#custom-job-stores) +- A custom `DjangoJobStore`: + an [APScheduler job store](https://apscheduler.readthedocs.io/en/latest/extending.html#custom-job-stores) that persists scheduled jobs to the Django database. You can view the scheduled jobs and monitor the job execution directly via the Django admin interface: - - ![Jobs](docs/screenshots/job_overview.png) + + ![Jobs](https://raw.githubusercontent.com/jcass77/django-apscheduler/main/docs/screenshots/job_overview.png) - The job store also maintains a history of all job executions of the currently scheduled jobs, along with status codes and exceptions (if any): - - ![Jobs](docs/screenshots/execution_overview.png) + + ![Jobs](https://raw.githubusercontent.com/jcass77/django-apscheduler/main/docs/screenshots/execution_overview.png) - **Note:** APScheduler will [automatically remove jobs](https://apscheduler.readthedocs.io/en/latest/userguide.html#removing-jobs) from the job store as soon as their last scheduled execution has been triggered. This will also delete the @@ -63,12 +64,13 @@ Features of this package include: - Job executions can also be triggered manually via the `DjangoJob` admin page: - ![Jobs](docs/screenshots/run_now.png) + ![Jobs](https://raw.githubusercontent.com/jcass77/django-apscheduler/main/docs/screenshots/run_now.png) - **Note:** In order to prevent long running jobs from causing the Django HTTP request to time out, the combined maximum run time for all APScheduler jobs that are started via the Django admin site is 25 seconds. This timeout value can be configured via the `APSCHEDULER_RUN_NOW_TIMEOUT` setting. + Installation ------------ @@ -123,27 +125,37 @@ from apscheduler.triggers.cron import CronTrigger from django.core.management.base import BaseCommand from django_apscheduler.jobstores import DjangoJobStore from django_apscheduler.models import DjangoJobExecution - +from django_apscheduler import util logger = logging.getLogger(__name__) def my_job(): - # Your job processing logic here... + # Your job processing logic here... pass + +# The `close_old_connections` decorator ensures that database connections, that have become unusable or are obsolete, +# are closed before and after our job has run. +@util.close_old_connections def delete_old_job_executions(max_age=604_800): - """This job deletes all apscheduler job executions older than `max_age` from the database.""" + """ + This job deletes APScheduler job execution entries older than `max_age` from the database. It helps to prevent the + database from filling up with old historical records that are no longer useful. + + :param max_age: The maximum length of time to retain historical job execution records. Defaults + to 7 days. + """ DjangoJobExecution.objects.delete_old_job_executions(max_age) class Command(BaseCommand): - help = "Runs apscheduler." + help = "Runs APScheduler." def handle(self, *args, **options): scheduler = BlockingScheduler(timezone=settings.TIME_ZONE) scheduler.add_jobstore(DjangoJobStore(), "default") - + scheduler.add_job( my_job, trigger=CronTrigger(second="*/10"), # Every 10 seconds @@ -188,16 +200,68 @@ Advanced Usage -------------- django-apscheduler assumes that you are already familiar with APScheduler and its proper use. If not, then please head -over to the project page and have a look through the [APScheduler documentation](https://apscheduler.readthedocs.io/en/latest/index.html). +over to the project page and have a look through +the [APScheduler documentation](https://apscheduler.readthedocs.io/en/latest/index.html). -It is possible to make use of [different types of schedulers](https://apscheduler.readthedocs.io/en/latest/userguide.html#choosing-the-right-scheduler-job-store-s-executor-s-and-trigger-s) +It is possible to make use +of [different types of schedulers](https://apscheduler.readthedocs.io/en/latest/userguide.html#choosing-the-right-scheduler-job-store-s-executor-s-and-trigger-s) depending on your environment and use case. If you would prefer running a `BackgroundScheduler` instead of using a `BlockingScheduler`, then you should be aware that using APScheduler with uWSGI requires some additional [configuration steps](https://apscheduler.readthedocs.io/en/latest/faq.html#how-can-i-use-apscheduler-with-uwsgi) in order to re-enable threading support. - - -## Project resources + + +Supported Databases +------------------- + +Please take note of the list of databases that +are [officially supported by Django](https://docs.djangoproject.com/en/dev/ref/databases/#databases). django-apscheduler +probably won't work with unsupported databases like Microsoft SQL Server, MongoDB, and the like. + + +Database Connections and Timeouts +--------------------------------- + +django-apscheduler is dependent on the standard Django +database [configuration settings](https://docs.djangoproject.com/en/dev/ref/databases/#general-notes). These settings, +in combination with how your database server has been configured, determine how connection management will be performed +for your specific deployment. + +The `close_old_connections` decorator should be applied to APScheduler jobs that require database access. Doing so +ensures that Django's [CONN_MAX_AGE](https://docs.djangoproject.com/en/dev/ref/settings/#std:setting-CONN_MAX_AGE) +configuration setting is enforced before and after your job is run. This mirrors the standard Django functionality of +doing the same before and after handling each HTTP request. + +If you still encounter any kind of 'lost database connection' errors then it probably means that: + +- Your database connections timed out in the middle of executing a job. You should probably consider incorporating a + connection pooler as part of your deployment for more robust database connection management + (e.g. [pgbouncer](https://www.pgbouncer.org) for PostgreSQL, or the equivalent for other DB platforms). +- Your database server has crashed / been restarted. + Django [will not reconnect automatically](https://code.djangoproject.com/ticket/24810) + and you need to re-start django-apscheduler as well. + +Common footguns +--------------- + +Unless you have a very specific set of requirements, and have intimate knowledge of the inner workings of APScheduler, +you shouldn't be using `BackgroundScheduler`. This can lead to all sorts of temptations like: + +* Firing up a scheduler inside of a Django view. This will most likely cause more than one scheduler to run concurrently + and lead to jobs running multiple times (see the above introduction to this README for a more thorough treatment of + the subject). +* Bootstrapping a scheduler somewhere else inside of your Django application. It feels like this should solve the + problem mentioned above and guarantee that only one scheduler is running. The downside is that you have just delegated + all of your background task processing to whatever webserver you are using (Gunicorn, uWSGI, etc.). It will probably + kill any long-running threads (your jobs) with extreme prejudice (thinking that they are caused by misbehaving HTTP + requests). + +Relying on `BlockingScheduler` forces you to run APScheduler in its own dedicated process that is not handled or +monitored by the webserver. The example code provided in `runapscheduler.py` above is a good starting point. + + +Project resources +----------------- - [Changelog](docs/changelog.md) - [Release procedures](docs/releasing.md) diff --git a/django_apscheduler/admin.py b/django_apscheduler/admin.py index b07f4e2..c54f1d9 100644 --- a/django_apscheduler/admin.py +++ b/django_apscheduler/admin.py @@ -9,7 +9,7 @@ from django.utils import timezone from django.utils.html import format_html from django.utils.safestring import mark_safe -from django.utils.translation import gettext as _ +from django.utils.translation import gettext_lazy as _ from django_apscheduler.models import DjangoJob, DjangoJobExecution from django_apscheduler import util @@ -27,7 +27,8 @@ def __init__(self, model, admin_site): self._django_jobstore = DjangoJobStore() self._memory_jobstore = DjangoMemoryJobStore() - self._jobs_executed = [] + self._jobs_scheduled = None + self._jobs_executed = None self._job_execution_timeout = getattr( settings, "APSCHEDULER_RUN_NOW_TIMEOUT", 15 ) @@ -58,7 +59,7 @@ def average_duration(self, obj): except DjangoJobExecution.DoesNotExist: return "None" - average_duration.short_description = "Average Duration (sec)" + average_duration.short_description = _("Average Duration (sec)") actions = ["run_selected_jobs"] @@ -69,21 +70,16 @@ def run_selected_jobs(self, request, queryset): scheduler.start() - num_jobs_scheduled = 0 - self._jobs_executed = [] + self._jobs_scheduled = set() + self._jobs_executed = set() start_time = timezone.now() for item in queryset: django_job = self._django_jobstore.lookup_job(item.id) if not django_job: - msg_dict = {"job_id": item.id} - msg = _( - "Could not find job {job_id} in the database! Skipping execution..." - ) - self.message_user( - request, format_html(msg, **msg_dict), messages.WARNING - ) + msg = _("Could not find job {} in the database! Skipping execution...") + self.message_user(request, format_html(msg, item.id), messages.WARNING) continue scheduler.add_job( @@ -98,17 +94,26 @@ def run_selected_jobs(self, request, queryset): max_instances=django_job.max_instances, ) - num_jobs_scheduled += 1 + self._jobs_scheduled.add(django_job.id) - while len(self._jobs_executed) < num_jobs_scheduled: + while self._jobs_scheduled != self._jobs_executed: # Wait for selected jobs to be executed. if timezone.now() > start_time + timedelta( seconds=self._job_execution_timeout ): msg = _( - "Maximum runtime exceeded! Not all jobs could be completed successfully." + "Maximum runtime of {} seconds exceeded! Not all jobs could be completed successfully. " + "Pending jobs: {}" + ) + self.message_user( + request, + format_html( + msg, + self._job_execution_timeout, + ",".join(self._jobs_scheduled - self._jobs_executed), + ), + messages.ERROR, ) - self.message_user(request, msg, messages.ERROR) scheduler.shutdown(wait=False) return None @@ -116,17 +121,15 @@ def run_selected_jobs(self, request, queryset): time.sleep(0.1) for job_id in self._jobs_executed: - msg_dict = {"job_id": job_id} - msg = _("Executed job '{job_id}'!") - self.message_user(request, format_html(msg, **msg_dict)) + self.message_user(request, format_html(_("Executed job '{}'!"), job_id)) scheduler.shutdown() return None def _handle_execution_event(self, event: events.JobExecutionEvent): - self._jobs_executed.append(event.job_id) + self._jobs_executed.add(event.job_id) - run_selected_jobs.short_description = "Run the selected django jobs" + run_selected_jobs.short_description = _("Run the selected django jobs") @admin.register(DjangoJobExecution) @@ -153,5 +156,5 @@ def local_run_time(self, obj): def duration_text(self, obj): return obj.duration or "N/A" - html_status.short_description = "Status" - duration_text.short_description = "Duration (sec)" + html_status.short_description = _("Status") + duration_text.short_description = _("Duration (sec)") diff --git a/django_apscheduler/jobstores.py b/django_apscheduler/jobstores.py index 6f82d67..6c05434 100644 --- a/django_apscheduler/jobstores.py +++ b/django_apscheduler/jobstores.py @@ -13,6 +13,7 @@ from django import db from django.db import transaction, IntegrityError +from django_apscheduler import util from django_apscheduler.models import DjangoJob, DjangoJobExecution from django_apscheduler.util import ( get_apscheduler_datetime, @@ -181,7 +182,8 @@ class is used for each event code. ) self._scheduler.add_listener( - self.handle_error_event, events.EVENT_JOB_ERROR | events.EVENT_JOB_MISSED, + self.handle_error_event, + events.EVENT_JOB_ERROR | events.EVENT_JOB_MISSED, ) @@ -199,6 +201,7 @@ def __init__(self, pickle_protocol: int = pickle.HIGHEST_PROTOCOL): super().__init__() self.pickle_protocol = pickle_protocol + @util.retry_on_db_operational_error def lookup_job(self, job_id: str) -> Union[None, AppSchedulerJob]: try: job_state = DjangoJob.objects.get(id=job_id).job_state @@ -211,6 +214,7 @@ def get_due_jobs(self, now) -> List[AppSchedulerJob]: dt = get_django_internal_datetime(now) return self._get_jobs(next_run_time__lte=dt) + @util.retry_on_db_operational_error def get_next_run_time(self): try: job = DjangoJob.objects.filter(next_run_time__isnull=False).earliest( @@ -227,6 +231,7 @@ def get_all_jobs(self): return jobs + @util.retry_on_db_operational_error def add_job(self, job: AppSchedulerJob): with transaction.atomic(): try: @@ -238,6 +243,7 @@ def add_job(self, job: AppSchedulerJob): except IntegrityError: raise ConflictingIdError(job.id) + @util.retry_on_db_operational_error def update_job(self, job: AppSchedulerJob): # Acquire lock for update with transaction.atomic(): @@ -254,12 +260,14 @@ def update_job(self, job: AppSchedulerJob): except DjangoJob.DoesNotExist: raise JobLookupError(job.id) + @util.retry_on_db_operational_error def remove_job(self, job_id: str): try: DjangoJob.objects.get(id=job_id).delete() except DjangoJob.DoesNotExist: raise JobLookupError(job_id) + @util.retry_on_db_operational_error def remove_all_jobs(self): # Implicit: will also delete all DjangoJobExecutions due to on_delete=models.CASCADE DjangoJob.objects.all().delete() @@ -276,6 +284,7 @@ def _reconstitute_job(self, job_state): return job + @util.retry_on_db_operational_error def _get_jobs(self, **filters): jobs = [] failed_job_ids = set() diff --git a/django_apscheduler/migrations/0001_initial.py b/django_apscheduler/migrations/0001_initial.py index 4c0189d..8f36652 100644 --- a/django_apscheduler/migrations/0001_initial.py +++ b/django_apscheduler/migrations/0001_initial.py @@ -29,7 +29,9 @@ class Migration(migrations.Migration): ("next_run_time", models.DateTimeField(db_index=True)), ("job_state", models.BinaryField()), ], - options={"ordering": ("next_run_time",),}, + options={ + "ordering": ("next_run_time",), + }, ), migrations.CreateModel( name="DjangoJobExecution", @@ -88,6 +90,8 @@ class Migration(migrations.Migration): ), ), ], - options={"ordering": ("-run_time",),}, + options={ + "ordering": ("-run_time",), + }, ), ] diff --git a/django_apscheduler/migrations/0006_remove_djangojob_name.py b/django_apscheduler/migrations/0006_remove_djangojob_name.py index 7ef7f80..956f4a7 100644 --- a/django_apscheduler/migrations/0006_remove_djangojob_name.py +++ b/django_apscheduler/migrations/0006_remove_djangojob_name.py @@ -10,5 +10,8 @@ class Migration(migrations.Migration): ] operations = [ - migrations.RemoveField(model_name="djangojob", name="name",), + migrations.RemoveField( + model_name="djangojob", + name="name", + ), ] diff --git a/django_apscheduler/migrations/0008_remove_djangojobexecution_started.py b/django_apscheduler/migrations/0008_remove_djangojobexecution_started.py index 8d8c3c8..d1f74b5 100644 --- a/django_apscheduler/migrations/0008_remove_djangojobexecution_started.py +++ b/django_apscheduler/migrations/0008_remove_djangojobexecution_started.py @@ -10,5 +10,8 @@ class Migration(migrations.Migration): ] operations = [ - migrations.RemoveField(model_name="djangojobexecution", name="started",), + migrations.RemoveField( + model_name="djangojobexecution", + name="started", + ), ] diff --git a/django_apscheduler/models.py b/django_apscheduler/models.py index e74bc01..e83cfdc 100644 --- a/django_apscheduler/models.py +++ b/django_apscheduler/models.py @@ -2,7 +2,7 @@ from django.db import models, transaction from django.utils import timezone -from django.utils.translation import ugettext_lazy as _ +from django.utils.translation import gettext_lazy as _ import logging @@ -58,7 +58,14 @@ class DjangoJobExecution(models.Model): MAX_INSTANCES = "Max instances!" ERROR = "Error!" - STATUS_CHOICES = [(x, x) for x in [SENT, ERROR, SUCCESS,]] + STATUS_CHOICES = [ + (x, x) + for x in [ + SENT, + ERROR, + SUCCESS, + ] + ] id = models.BigAutoField( primary_key=True, help_text=_("Unique ID for this job execution.") @@ -79,7 +86,8 @@ class DjangoJobExecution(models.Model): ) run_time = models.DateTimeField( - db_index=True, help_text=_("Date and time at which this job was executed."), + db_index=True, + help_text=_("Date and time at which this job was executed."), ) # We store this value in the DB even though it can be calculated as `finished - run_time`. This allows quick @@ -118,6 +126,7 @@ class DjangoJobExecution(models.Model): objects = DjangoJobExecutionManager() @classmethod + @util.retry_on_db_operational_error def atomic_update_or_create( cls, lock, @@ -128,7 +137,7 @@ def atomic_update_or_create( traceback: str = None, ) -> "DjangoJobExecution": """ - Uses an APScheduler lock to ensures that only one database entry can be created / updated at a time. + Uses an APScheduler lock to ensure that only one database entry can be created / updated at a time. This keeps django_apscheduler in sync with APScheduler and maintains a 1:1 mapping between APScheduler events that are triggered and the corresponding DjangoJobExecution model instances that are persisted to the database. diff --git a/django_apscheduler/util.py b/django_apscheduler/util.py index 18d01c6..dac4600 100644 --- a/django_apscheduler/util.py +++ b/django_apscheduler/util.py @@ -1,10 +1,15 @@ +import logging from datetime import datetime +from functools import wraps from apscheduler.schedulers.base import BaseScheduler +from django import db from django.conf import settings from django.utils import formats from django.utils import timezone +logger = logging.getLogger(__name__) + def get_dt_format() -> str: """Return the configured format for displaying datetimes in the Django admin views""" @@ -45,3 +50,85 @@ def get_apscheduler_datetime(dt: datetime, scheduler: BaseScheduler) -> datetime return timezone.make_aware(dt, timezone=scheduler.timezone) return dt + + +def retry_on_db_operational_error(func): + """ + This decorator can be used to wrap a database-related method so that it will be retried when a + django.db.OperationalError is encountered. + + The rationale is that django.db.OperationalError is usually raised when attempting to use an old database + connection that the database backend has since closed. Closing the Django connection as well, and re-trying with + a fresh connection, is usually sufficient to solve the problem. + + It is a reluctant workaround for users that persistently have issues with stale database connections (most notably: + 2006, 'MySQL server has gone away'). + + The recommended approach is still to rather install a database connection pooler (like pgbouncer), to take care of + database connection management for you, but the issue has been raised enough times by different individuals that a + workaround is probably justified. + + CAUTION: any method that this decorator is applied to MUST be idempotent (i.e. the method can be retried a second + time without any unwanted side effects). If your method performs any actions before the django.db.OperationalError + is raised then those actions will be repeated. If you don't want that to happen then it would be best to handle the + django.db.OperationalError exception manually and call `db.close_old_connections()` in an appropriate fashion + inside your own method instead. + + The following list of alternative workarounds were also considered: + + 1. Calling db.close_old_connections() pre-emptively before the job store executes a DB operation: this would break + Django's standard connection management. For example, if the `CONN_MAX_AGE` setting is set to 0, a new connection + will be required for *every* database operation (as opposed to at the end of every *request* like in the Django + standard). The database overhead, and associated performance penalty, that this approach would impose seem + unreasonable. See: https://docs.djangoproject.com/en/dev/ref/settings/#std:setting-CONN_MAX_AGE. + + 2. Using a custom QuerySet or database backend that handles django.db.OperationalError automatically: this would + be more convenient than having to decorate individual methods, but it would also break when a DB operation needs + to be re-tried as part of an atomic transaction. See: https://github.com/django/django/pull/2740 + + 3. Pinging the database before each operation to see if it is still available: django-apscheduler used to make use + of this approach (see: https://github.com/jcass77/django-apscheduler/blob/9ac06b33d19961da6c36d5ac814d4338beb11309/django_apscheduler/models.py#L16-L51). + Injecting an additional database query, on an arbitrary schedule, seems like an unreasonable thing to do, + especially considering that this would be unnecessary for users that already make use of a database connection + pooler to manage their connections properly. + """ + + @wraps(func) + def func_wrapper(*args, **kwargs): + try: + result = func(*args, **kwargs) + except db.OperationalError as e: + logger.warning( + f"DB error executing '{func.__name__}' ({e}). Retrying with a new DB connection..." + ) + db.close_old_connections() + result = func(*args, **kwargs) + + return result + + return func_wrapper + + +def close_old_connections(func): + """ + A decorator that ensures that Django database connections that have become unusable, or are obsolete, are closed + before and after a method is executed (see: https://docs.djangoproject.com/en/dev/ref/databases/#general-notes + for background). + + This decorator is intended to be used to wrap APScheduler jobs, and provides functionality comparable to the + Django standard approach of closing old connections before and after each HTTP request is processed. + + It only makes sense for APScheduler jobs that require database access, and prevents `django.db.OperationalError`s. + """ + + @wraps(func) + def func_wrapper(*args, **kwargs): + db.close_old_connections() + try: + result = func(*args, **kwargs) + finally: + db.close_old_connections() + + return result + + return func_wrapper diff --git a/docs/changelog.md b/docs/changelog.md index 63bb52f..0f97e86 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -2,6 +2,25 @@ This changelog is used to track all major changes to django-apscheduler. +## v0.6.0 (2021-06-17) + +**Fixes** + +- Fix screenshot links in README to work on PyPI. +- Remove reference to deprecated `django.utils.translation.ugettext_lazy`. + +**Enhancements** + +- The Django admin page will now show a list of all the manually triggered jobs that could not be completed + before `settings.APSCHEDULER_RUN_NOW_TIMEOUT` seconds elapsed. +- Make more of the string output on the admin page Django-translatable. +- Introduce a `retry_on_db_operational_error` utility decorator for retrying database-related operations when + a `django.db.OperationalError` is encountered (Partial resolution + of [#145](https://github.com/jcass77/django-apscheduler/issues/145)). +- Introduce a `close_old_connections` utility decorator to enforce Django's `CONN_MAX_AGE` setting. (Partial resolution + of [#145](https://github.com/jcass77/django-apscheduler/issues/145)). **This decorator should be applied to all of + your jobs that require access to the database.** + ## v0.5.2 (2021-01-28) **Enhancements** @@ -184,4 +203,4 @@ This changelog is used to track all major changes to django-apscheduler. ## Pre-releases -- The project did not tag a number of pre-release versions. \ No newline at end of file +- The project did not tag a number of pre-release versions. diff --git a/setup.py b/setup.py index 71b88db..3b3c3be 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ setup( name="django-apscheduler", - version="0.5.2", + version="0.6.0", description="APScheduler for Django", long_description=long_description, long_description_content_type="text/markdown", @@ -37,6 +37,9 @@ ], keywords="django apscheduler django-apscheduler", packages=find_packages(exclude=("tests",)), - install_requires=["django>=2.2", "apscheduler>=3.2,<4.0", ], + install_requires=[ + "django>=2.2", + "apscheduler>=3.2,<4.0", + ], zip_safe=False, ) diff --git a/tests/conftest.py b/tests/conftest.py index f851a53..0a5ff5c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,12 +7,18 @@ from apscheduler.job import Job from apscheduler.schedulers.base import BaseScheduler from apscheduler.schedulers.blocking import BlockingScheduler +from django import db from django.db import transaction from django_apscheduler.jobstores import DjangoJobStore from django_apscheduler.models import DjangoJob +def raise_db_operational_error(*args, **kwargs): + """Helper method for triggering a db.OperationalError as a side effect of executing mocked DB operations""" + raise db.OperationalError("Some DB-related error") + + class DummyScheduler(BaseScheduler): def __init__(self, *args, **kwargs): super(DummyScheduler, self).__init__(*args, **kwargs) diff --git a/tests/test_admin.py b/tests/test_admin.py index 28fae81..7bf7f64 100644 --- a/tests/test_admin.py +++ b/tests/test_admin.py @@ -3,8 +3,10 @@ import pytest from apscheduler.schedulers.background import BackgroundScheduler +from django.conf import settings from django.contrib.messages.storage.base import BaseStorage from django.utils import timezone +from django.utils.html import format_html from django_apscheduler.admin import DjangoJobAdmin, DjangoJobExecutionAdmin from django_apscheduler.jobstores import DjangoJobStore @@ -97,7 +99,11 @@ def test_average_duration_no_executions_shows_none_text(self, request, rf): assert admin.average_duration(job) == "None" @pytest.mark.django_db(transaction=True) - def test_run_selected_jobs_creates_job_execution_entry(self, rf): + def test_run_selected_jobs_creates_job_execution_entry(self, rf, monkeypatch): + monkeypatch.setattr( + settings, "APSCHEDULER_RUN_NOW_TIMEOUT", 1 + ) # Shorten timeout to reduce test runtime + scheduler = BackgroundScheduler() scheduler.add_jobstore(DjangoJobStore()) scheduler.start() @@ -145,11 +151,10 @@ def test_run_selected_jobs_job_not_found_skips_execution(self, rf): scheduler.shutdown() @pytest.mark.django_db(transaction=True) - def test_run_selected_jobs_enforces_timeout(self, rf, settings): - - settings.APSCHEDULER_RUN_NOW_TIMEOUT = ( - 1 # Shorten timeout to reduce test runtime - ) + def test_run_selected_jobs_enforces_timeout(self, rf, monkeypatch): + monkeypatch.setattr( + settings, "APSCHEDULER_RUN_NOW_TIMEOUT", 1 + ) # Shorten timeout to reduce test runtime scheduler = BackgroundScheduler() scheduler.add_jobstore(DjangoJobStore()) @@ -170,7 +175,12 @@ def test_run_selected_jobs_enforces_timeout(self, rf, settings): assert DjangoJobExecution.objects.count() == 0 r._messages.add.assert_called_with( 40, - "Maximum runtime exceeded! Not all jobs could be completed successfully.", + format_html( + "Maximum runtime of {} seconds exceeded! Not all jobs could be completed successfully. " + "Pending jobs: {}", + admin._job_execution_timeout, + ",".join({job.id}), + ), "", ) @@ -186,7 +196,9 @@ def test_html_status_returns_colored_status_text(self, rf, request): request.addfinalizer(job.delete) execution = DjangoJobExecution.objects.create( - job=job, status=DjangoJobExecution.SUCCESS, run_time=now, + job=job, + status=DjangoJobExecution.SUCCESS, + run_time=now, ) admin = DjangoJobExecutionAdmin(DjangoJob, None) @@ -202,7 +214,9 @@ def test_duration_text_no_duration_returns_na(self, rf, request): request.addfinalizer(job.delete) execution = DjangoJobExecution.objects.create( - job=job, status=DjangoJobExecution.SUCCESS, run_time=now, + job=job, + status=DjangoJobExecution.SUCCESS, + run_time=now, ) admin = DjangoJobExecutionAdmin(DjangoJob, None) diff --git a/tests/test_jobstores.py b/tests/test_jobstores.py index 69135c0..b721e3a 100644 --- a/tests/test_jobstores.py +++ b/tests/test_jobstores.py @@ -1,9 +1,11 @@ import warnings from datetime import datetime +from unittest import mock import pytest from apscheduler import events from apscheduler.events import JobExecutionEvent, JobSubmissionEvent +from django import db from django.utils import timezone from django_apscheduler.jobstores import ( @@ -12,6 +14,7 @@ register_events, ) from django_apscheduler.models import DjangoJob, DjangoJobExecution +from tests import conftest from tests.conftest import DummyScheduler, dummy_job @@ -33,7 +36,11 @@ def test_handle_submission_event_not_supported_raises_exception(self, jobstore): @pytest.mark.django_db @pytest.mark.parametrize( - "event_code", [events.EVENT_JOB_SUBMITTED, events.EVENT_JOB_MAX_INSTANCES, ], + "event_code", + [ + events.EVENT_JOB_SUBMITTED, + events.EVENT_JOB_MAX_INSTANCES, + ], ) def test_handle_submission_event_creates_job_execution( self, event_code, jobstore, create_add_job @@ -99,7 +106,11 @@ def test_handle_error_event_not_supported_raises_exception(self, jobstore): @pytest.mark.django_db @pytest.mark.parametrize( - "event_code", [events.EVENT_JOB_MISSED, events.EVENT_JOB_ERROR,], + "event_code", + [ + events.EVENT_JOB_MISSED, + events.EVENT_JOB_ERROR, + ], ) def test_handle_error_event_creates_job_execution( self, jobstore, create_add_job, event_code @@ -159,7 +170,115 @@ class TestDjangoJobStore: See 'test_apscheduler_jobstore.py' for details """ - pass + @pytest.mark.django_db(transaction=True) + def test_lookup_job_does_retry_on_db_operational_error(self, jobstore): + with mock.patch.object(db.connection, "close") as close_mock: + with pytest.raises(db.OperationalError, match="Some DB-related error"): + with mock.patch( + "django_apscheduler.jobstores.DjangoJob.objects.get", + side_effect=conftest.raise_db_operational_error, + ): + jobstore.lookup_job("some job") + + assert close_mock.call_count == 1 + + @pytest.mark.django_db(transaction=True) + def test_get_due_jobs_does_retry_on_db_operational_error(self, jobstore): + with mock.patch.object(db.connection, "close") as close_mock: + with pytest.raises(db.OperationalError, match="Some DB-related error"): + with mock.patch( + "django_apscheduler.jobstores.DjangoJob.objects.filter", + side_effect=conftest.raise_db_operational_error, + ): + jobstore.get_due_jobs(datetime(2016, 5, 3)) + + assert close_mock.call_count == 1 + + @pytest.mark.django_db(transaction=True) + def test_get_next_run_time_does_retry_on_db_operational_error(self, jobstore): + with mock.patch.object(db.connection, "close") as close_mock: + with pytest.raises(db.OperationalError, match="Some DB-related error"): + with mock.patch( + "django_apscheduler.jobstores.DjangoJob.objects.filter", + side_effect=conftest.raise_db_operational_error, + ): + jobstore.get_next_run_time() + + assert close_mock.call_count == 1 + + @pytest.mark.django_db(transaction=True) + def test_add_job_does_retry_on_db_operational_error(self, jobstore, create_job): + job = create_job( + func=dummy_job, + trigger="date", + trigger_args={"run_date": datetime(2016, 5, 3)}, + id="test", + ) + + with mock.patch.object(db.connection, "close") as close_mock: + with pytest.raises(db.OperationalError, match="Some DB-related error"): + with mock.patch( + "django_apscheduler.jobstores.DjangoJob.objects.create", + side_effect=conftest.raise_db_operational_error, + ): + jobstore.add_job(job) + + assert close_mock.call_count == 1 + + @pytest.mark.django_db(transaction=True) + def test_update_job_does_retry_on_db_operational_error(self, jobstore, create_job): + job = create_job( + func=dummy_job, + trigger="date", + trigger_args={"run_date": datetime(2016, 5, 3)}, + id="test", + ) + + with mock.patch.object(db.connection, "close") as close_mock: + with pytest.raises(db.OperationalError, match="Some DB-related error"): + with mock.patch( + "django_apscheduler.jobstores.DjangoJob.objects.get", + side_effect=conftest.raise_db_operational_error, + ): + jobstore.update_job(job) + + assert close_mock.call_count == 1 + + @pytest.mark.django_db(transaction=True) + def test_remove_job_does_retry_on_db_operational_error(self, jobstore): + with mock.patch.object(db.connection, "close") as close_mock: + with pytest.raises(db.OperationalError, match="Some DB-related error"): + with mock.patch( + "django_apscheduler.jobstores.DjangoJob.objects.get", + side_effect=conftest.raise_db_operational_error, + ): + jobstore.remove_job("some job") + + assert close_mock.call_count == 1 + + @pytest.mark.django_db(transaction=True) + def test_remove_all_jobs_does_retry_on_db_operational_error(self, jobstore): + with mock.patch.object(db.connection, "close") as close_mock: + with pytest.raises(db.OperationalError, match="Some DB-related error"): + with mock.patch( + "django_apscheduler.jobstores.DjangoJob.objects.all", + side_effect=conftest.raise_db_operational_error, + ): + jobstore.remove_all_jobs() + + assert close_mock.call_count == 1 + + @pytest.mark.django_db(transaction=True) + def test_get_jobs_does_retry_on_db_operational_error(self, jobstore): + with mock.patch.object(db.connection, "close") as close_mock: + with pytest.raises(db.OperationalError, match="Some DB-related error"): + with mock.patch( + "django_apscheduler.jobstores.DjangoJob.objects.filter", + side_effect=conftest.raise_db_operational_error, + ): + jobstore._get_jobs() + + assert close_mock.call_count == 1 @pytest.mark.django_db diff --git a/tests/test_models.py b/tests/test_models.py index 91733f7..7ce1a4d 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,12 +1,15 @@ import logging from datetime import timedelta from threading import RLock +from unittest import mock import pytest from apscheduler import events +from django import db from django.utils import timezone from django_apscheduler.models import DjangoJobExecution, DjangoJob +from tests import conftest logging.basicConfig() @@ -96,7 +99,10 @@ def test_atomic_update_or_create_updates_existing_jobs(self, request, jobstore): assert ex.finished is None DjangoJobExecution.atomic_update_or_create( - RLock(), ex.job_id, ex.run_time, DjangoJobExecution.SUCCESS, + RLock(), + ex.job_id, + ex.run_time, + DjangoJobExecution.SUCCESS, ) ex.refresh_from_db() @@ -124,7 +130,10 @@ def test_atomic_update_or_create_ignores_late_submission_events( assert ex.finished is None DjangoJobExecution.atomic_update_or_create( - RLock(), ex.job_id, ex.run_time, DjangoJobExecution.SENT, + RLock(), + ex.job_id, + ex.run_time, + DjangoJobExecution.SENT, ) ex.refresh_from_db() @@ -133,6 +142,35 @@ def test_atomic_update_or_create_ignores_late_submission_events( assert ex.duration is None assert ex.finished is None + @pytest.mark.django_db(transaction=True) + def test_atomic_update_or_create_does_retry_on_db_operational_error( + self, request, jobstore + ): + now = timezone.now() + job = DjangoJob.objects.create(id="test_job", next_run_time=now) + request.addfinalizer(job.delete) + + ex = DjangoJobExecution.objects.create( + job_id=job.id, + run_time=job.next_run_time - timedelta(seconds=5), + status=DjangoJobExecution.SENT, + ) + + with mock.patch.object(db.connection, "close") as close_mock: + with pytest.raises(db.OperationalError, match="Some DB-related error"): + with mock.patch( + "django_apscheduler.models.DjangoJobExecution.objects.select_for_update", + side_effect=conftest.raise_db_operational_error, + ): + DjangoJobExecution.atomic_update_or_create( + RLock(), + ex.job_id, + ex.run_time, + DjangoJobExecution.SUCCESS, + ) + + assert close_mock.call_count == 1 + @pytest.mark.django_db def test_str(self, request): now = timezone.now() diff --git a/tests/test_util.py b/tests/test_util.py index 8d0a51c..167a3d2 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,7 +1,10 @@ from datetime import datetime +from unittest import mock import pytest import pytz +from django import db + from django.utils import timezone from django_apscheduler import util @@ -59,3 +62,113 @@ def test_get_apscheduler_datetime(scheduler): apscheduler_dt = util.get_apscheduler_datetime(datetime.now(), scheduler) assert timezone.is_aware(apscheduler_dt) + + +@pytest.mark.django_db +def test_retry_on_db_operational_error_no_db_errors(caplog): + @util.retry_on_db_operational_error + def dummy_db_op(): + return + + with mock.patch.object(db.connection, "close") as close_mock: + dummy_db_op() + assert not close_mock.called + + assert "Retrying with a new DB connection..." not in caplog.text + + +@pytest.mark.django_db +def test_retry_on_db_operational_error_db_operational_error_retry_ok(caplog): + def dummy_func_maker(): + call_count = 0 + + @util.retry_on_db_operational_error + def dummy_db_op(): + nonlocal call_count + call_count += 1 + if call_count == 1: + # Raise exception on first call + raise db.OperationalError("Some DB-related error") + + return call_count + + return dummy_db_op + + func = dummy_func_maker() + with mock.patch.object(db.connection, "close") as close_mock: + call_count = func() + assert call_count == 2 + assert close_mock.call_count == 1 + + assert ( + "DB error executing 'dummy_db_op' (Some DB-related error). Retrying with a new DB connection..." + in caplog.text + ) + + +@pytest.mark.django_db +def test_retry_on_db_operational_error_db_operational_error_retry_error_persists_re_raises( + caplog, +): + @util.retry_on_db_operational_error + def func(): + raise db.OperationalError("Some DB-related error") + + with mock.patch.object(db.connection, "close") as close_mock: + with pytest.raises(db.OperationalError, match="Some DB-related error"): + call_count = func() + assert call_count == 2 + + assert close_mock.call_count == 1 + + assert ( + "DB error executing 'func' (Some DB-related error). Retrying with a new DB connection..." + in caplog.text + ) + + +@pytest.mark.django_db +def test_retry_on_db_operational_error_non_db_operational_error_re_raises( + caplog, +): + @util.retry_on_db_operational_error + def func(): + raise RuntimeError("Some non DB-related error") + + with mock.patch.object(db.connection, "close") as close_mock: + with pytest.raises(RuntimeError, match="Some non DB-related error"): + func() + + assert not close_mock.called + + assert ( + "DB error executing 'dummy_db_op' (Some DB-related error). Retrying with a new DB connection..." + not in caplog.text + ) + + +def test_close_old_connections_calls_close_old_connections(): + @util.close_old_connections + def job_mock(): + pass + + with mock.patch( + "django_apscheduler.util.db.close_old_connections" + ) as close_old_connections_mock: + job_mock() + + assert close_old_connections_mock.call_count == 2 + + +def test_close_old_connections_even_if_exception_is_raised(): + @util.close_old_connections + def job_mock(): + raise RuntimeError("some error") + + with mock.patch( + "django_apscheduler.util.db.close_old_connections" + ) as close_old_connections_mock: + with pytest.raises(RuntimeError, match="some error"): + job_mock() + + assert close_old_connections_mock.call_count == 2