Skip to content

Commit

Permalink
enh(admin): Add ability to trigger job executions via DjangoJob admin…
Browse files Browse the repository at this point in the history
… page.

Resolves #102.
  • Loading branch information
jcass77 committed Oct 6, 2020
1 parent 2167470 commit 8e9348c
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 16 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ Features of this package include:

- You can view the scheduled jobs and monitor the job execution directly via the Django admin interface.

- Job executions can also be triggered manually via the DjangoJob admin page. In order to prevent long running jobs from
causing the Django request to time out, the combined maximum run time for all APScheduler jobs started in this way is
15 seconds. This timeout value can be configured via the `APSCHEDULER_RUN_NOW_TIMEOUT` setting.

![Jobs](docs/screenshots/run_now.png)

Installation
------------

Expand Down
92 changes: 80 additions & 12 deletions django_apscheduler/admin.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,37 @@
from time import sleep

import time
from datetime import timedelta

from apscheduler import events
from apscheduler.schedulers.background import BackgroundScheduler
from django.contrib import admin
from django.conf import settings
from django.contrib import admin, messages
from django.db.models import Avg
from django.utils import timezone
from django.utils.html import format_html
from django.utils.safestring import mark_safe

from django.utils.translation import gettext as _

from django_apscheduler.models import DjangoJob, DjangoJobExecution
from django_apscheduler import util
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.jobstores import DjangoJobStore, DjangoMemoryJobStore


@admin.register(DjangoJob)
class DjangoJobAdmin(admin.ModelAdmin):
search_fields = ["id"]
list_display = ["id", "local_run_time", "average_duration"]

def __init__(self, model, admin_site):
super().__init__(model, admin_site)

self._django_jobstore = DjangoJobStore()
self._memory_jobstore = DjangoMemoryJobStore()

self._jobs_executed = []
self._job_execution_timeout = getattr(
settings, "APSCHEDULER_RUN_NOW_TIMEOUT", 15
)

def get_queryset(self, request):
qs = super().get_queryset(request)

Expand Down Expand Up @@ -46,19 +60,73 @@ def average_duration(self, obj):

average_duration.short_description = "Average Duration (sec)"

actions = ['run_selected_jobs']
actions = ["run_selected_jobs"]

def run_selected_jobs(self, request, queryset):
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")
scheduler.add_jobstore(self._memory_jobstore)
scheduler.add_listener(self._handle_execution_event, events.EVENT_JOB_EXECUTED)

scheduler.start()

num_jobs_scheduled = 0
self._jobs_executed = []
start_time = timezone.now()

for item in queryset:
job = scheduler.get_job(item.id, "default")
if job:
scheduler.modify_job(job.id, "default", next_run_time=timezone.now())
sleep(0.1)
django_job = self._django_jobstore.lookup_job(item.id)

if not django_job:
msg_dict = {"job_id": item.id}
msg = _(
"Could not find job {job_id} in the database! Skipping execution..."
)
self.message_user(
request, format_html(msg, **msg_dict), messages.WARNING
)
continue

scheduler.add_job(
django_job.func_ref,
trigger=None, # Run immediately
args=django_job.args,
kwargs=django_job.kwargs,
id=django_job.id,
name=django_job.name,
misfire_grace_time=django_job.misfire_grace_time,
coalesce=django_job.coalesce,
max_instances=django_job.max_instances,
)

num_jobs_scheduled += 1

while len(self._jobs_executed) < num_jobs_scheduled:
# Wait for selected jobs to be executed.
if timezone.now() > start_time + timedelta(
seconds=self._job_execution_timeout
):
msg = _(
"Maximum runtime exceeded! Not all jobs could be completed successfully."
)
self.message_user(request, msg, messages.ERROR)

scheduler.shutdown(wait=False)
return None

time.sleep(0.1)

for job_id in self._jobs_executed:
msg_dict = {"job_id": job_id}
msg = _("Executed job '{job_id}'!")
self.message_user(request, format_html(msg, **msg_dict))

scheduler.shutdown()
run_selected_jobs.short_description = "Run the selected django jobs"
return None

def _handle_execution_event(self, event: events.JobExecutionEvent):
self._jobs_executed.append(event.job_id)

run_selected_jobs.short_description = "Run the selected Django jobs"


@admin.register(DjangoJobExecution)
Expand Down
10 changes: 10 additions & 0 deletions django_apscheduler/jobstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from apscheduler.events import JobSubmissionEvent, JobExecutionEvent
from apscheduler.job import Job as AppSchedulerJob
from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.base import BaseScheduler

from django import db
Expand Down Expand Up @@ -288,6 +289,15 @@ def __repr__(self):
return f"<{self.__class__.__name__}(pickle_protocol={self.pickle_protocol})>"


class DjangoMemoryJobStore(DjangoResultStoreMixin, MemoryJobStore):
"""
Adds the DjangoResultStoreMixin to the standard MemoryJobStore so that job executions can be
logged to the Django database.
"""

pass


def register_events(scheduler, result_storage=None):
# TODO: Remove this deprecated function in release 0.5
# DjangoResultStoreMixin now takes care of registering event listeners automatically when the scheduler is started.
Expand Down
6 changes: 5 additions & 1 deletion docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
This changelog is used to track all major changes to django_apscheduler.


## v0.4.3 (UNRELEASED)
## v0.5.0 (UNRELEASED)

**Enhancements**

- Add ability to trigger a scheduled job manually from the `DjangoJobAdmin` page (Resolves [#102](https://github.com/jarekwg/django-apscheduler/issues/102)).

**Fixes**

Expand Down
Binary file added docs/screenshots/run_now.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions tests/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@
"NAME": "db",
},
}

APSCHEDULER_RUN_NOW_TIMEOUT = 15
APSCHEDULER_DATETIME_FORMAT = "N j, Y, f:s a"
89 changes: 87 additions & 2 deletions tests/test_admin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from datetime import timedelta
from unittest import mock

import pytest
from apscheduler.schedulers.background import BackgroundScheduler
from django.contrib.messages.storage.base import BaseStorage
from django.utils import timezone

from django_apscheduler.admin import DjangoJobAdmin, DjangoJobExecutionAdmin
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJob, DjangoJobExecution


Expand Down Expand Up @@ -79,18 +83,99 @@ def test_average_duration_returns_correct_value(self, rf, request):
assert admin.average_duration(job) == 7.5

@pytest.mark.django_db
def test_average_duration_no_executions_shows_none_text(self, request):
def test_average_duration_no_executions_shows_none_text(self, request, rf):
now = timezone.now()
run_time = now - timedelta(seconds=60)

job = DjangoJob.objects.create(id="test_job", next_run_time=run_time)
request.addfinalizer(job.delete)

admin = DjangoJobAdmin(DjangoJob, None)
admin.get_queryset(request)
r = rf.get("/django_apscheduler/djangojob/")
admin.get_queryset(r)

assert admin.average_duration(job) == "None"

@pytest.mark.django_db(transaction=True)
def test_run_selected_jobs_creates_job_execution_entry(self, rf):
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore())
scheduler.start()

job = scheduler.add_job(print, trigger="interval", seconds=60)

admin = DjangoJobAdmin(DjangoJob, None)

r = rf.get("/django_apscheduler/djangojob/")
# Add support for Django messaging framework
r._messages = mock.MagicMock(BaseStorage)
r._messages.add = mock.MagicMock()

assert not DjangoJobExecution.objects.filter(job_id=job.id).exists()

admin.run_selected_jobs(r, DjangoJob.objects.filter(id=job.id))

assert DjangoJobExecution.objects.filter(job_id=job.id).exists()
r._messages.add.assert_called_with(20, f"Executed job '{job.id}'!", "")

scheduler.shutdown()

@pytest.mark.django_db(transaction=True)
def test_run_selected_jobs_job_not_found_skips_execution(self, rf):
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore())
scheduler.start()

job = DjangoJob.objects.create(id="test_job")

admin = DjangoJobAdmin(DjangoJob, None)

r = rf.get("/django_apscheduler/djangojob/")
# Add support for Django messaging framework
r._messages = mock.MagicMock(BaseStorage)
r._messages.add = mock.MagicMock()

admin.run_selected_jobs(r, DjangoJob.objects.filter(id=job.id))

assert DjangoJobExecution.objects.count() == 0
r._messages.add.assert_called_with(
30, "Could not find job test_job in the database! Skipping execution...", ""
)

scheduler.shutdown()

@pytest.mark.django_db(transaction=True)
def test_run_selected_jobs_enforces_timeout(self, rf, settings):

settings.APSCHEDULER_RUN_NOW_TIMEOUT = (
1 # Shorten timeout to reduce test runtime
)

scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore())
scheduler.start()

job = scheduler.add_job(print, trigger="interval", seconds=60)

admin = DjangoJobAdmin(DjangoJob, None)

r = rf.get("/django_apscheduler/djangojob/")
# Add support for Django messaging framework
r._messages = mock.MagicMock(BaseStorage)
r._messages.add = mock.MagicMock()

with mock.patch("django_apscheduler.admin.BackgroundScheduler.add_listener"):
admin.run_selected_jobs(r, DjangoJob.objects.filter(id=job.id))

assert DjangoJobExecution.objects.count() == 0
r._messages.add.assert_called_with(
40,
"Maximum runtime exceeded! Not all jobs could be completed successfully.",
"",
)

scheduler.shutdown()


class TestDjangoJobExecutionAdmin:
@pytest.mark.django_db
Expand Down
2 changes: 1 addition & 1 deletion tests/test_jobstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def test_handle_execution_event_creates_job_execution(

@pytest.mark.django_db(transaction=True)
def test_handle_execution_event_for_job_that_no_longer_exists_does_not_raise_exception_regression_116(
self, jobstore
self, jobstore
):
# Test for regression https://github.com/jarekwg/django-apscheduler/issues/116
event = JobExecutionEvent(
Expand Down

0 comments on commit 8e9348c

Please sign in to comment.