Skip to content

Commit

Permalink
refactor: General code refactoring and cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
jcass77 committed Jul 16, 2020
1 parent 2d357b4 commit 56028e3
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 151 deletions.
56 changes: 33 additions & 23 deletions django_apscheduler/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,22 @@

from django.contrib import admin
from django.db.models import Avg
from django.utils.safestring import mark_safe
from django.utils.timezone import now

from django_apscheduler.models import DjangoJob, DjangoJobExecution
from django_apscheduler import util


def execute_now(ma, r, qs):
for item in qs:
item.next_run_time = now()
item.save()


execute_now.short_description = "Force tasks to execute right now"


@admin.register(DjangoJob)
class DjangoJobAdmin(admin.ModelAdmin):
list_display = ["id", "name", "next_run_time_sec", "average_duration"]
actions = []
search_fields = ["name"]
list_display = ["id", "name", "next_run_time", "average_duration"]

def get_queryset(self, request):
self._durations = {
item[0]: item[1]
for item in DjangoJobExecution.objects.filter(
job: duration
for job, duration in DjangoJobExecution.objects.filter(
status=DjangoJobExecution.SUCCESS,
run_time__gte=now() - datetime.timedelta(days=2),
)
Expand All @@ -34,27 +26,45 @@ def get_queryset(self, request):
}
return super().get_queryset(request)

def next_run_time_sec(self, obj):
def next_run_time(self, obj):
if obj.next_run_time is None:
return "(paused)"
return util.localize(obj.next_run_time)

def average_duration(self, obj):
return self._durations.get(obj.id) or 0
return self._durations.get(obj.id, "None")

average_duration.short_description = "Average Duration (sec)"


@admin.register(DjangoJobExecution)
class DjangoJobExecutionAdmin(admin.ModelAdmin):
list_display = ["id", "job", "html_status", "run_time_sec", "duration"]

status_color_mapping = {
DjangoJobExecution.ADDED: "RoyalBlue",
DjangoJobExecution.SENT: "SkyBlue",
DjangoJobExecution.MAX_INSTANCES: "yellow",
DjangoJobExecution.MISSED: "yellow",
DjangoJobExecution.MODIFIED: "yellow",
DjangoJobExecution.REMOVED: "red",
DjangoJobExecution.ERROR: "red",
DjangoJobExecution.SUCCESS: "green",
}

list_display = ["id", "job", "html_status", "local_run_time", "duration_text"]
list_filter = ["job__name", "run_time", "status"]

def run_time_sec(self, obj):
def html_status(self, obj):
return mark_safe(
f'<p style="color: {self.status_color_mapping[obj.status]}">{obj.status}</p>'
)

def local_run_time(self, obj):
return util.localize(obj.run_time)

def duration_text(self, obj):
return obj.duration or "N/A"

def get_queryset(self, request):
return (
super()
.get_queryset(request)
.select_related("job")
)
return super().get_queryset(request).select_related("job")

html_status.short_description = "Status"
4 changes: 4 additions & 0 deletions django_apscheduler/docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ This changelog is used to track all major changes to django_apscheduler.
- Clean up unused dependencies / update dependencies to latest available versions.
- Switch to Black code formatting.
- Align package layout with official [Django recommendations](https://docs.djangoproject.com/en/dev/intro/reusable-apps/#packaging-your-app)
- Move UI-related DjangoJobExecution.html_status out of model definition and in to the associated model admin definition.
- Add `help_text` to model fields to document their use.
- Remove unused code fragments.
- Add Python type annotations.

**Fixes**

Expand Down
122 changes: 61 additions & 61 deletions django_apscheduler/jobstores.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import logging
import pickle
import warnings
from typing import Union, List

from apscheduler import events
from apscheduler.events import JobExecutionEvent, JobSubmissionEvent
from apscheduler.job import Job
from apscheduler.jobstores.base import BaseJobStore, JobLookupError
from apscheduler.schedulers.base import BaseScheduler

from django.core.exceptions import ObjectDoesNotExist
from django import db
from django.db import connections
from django.db.utils import OperationalError, ProgrammingError
Expand All @@ -20,6 +19,7 @@
logger = logging.getLogger("django_apscheduler")


# TODO: Remove this workaround, which seems to mute DB-related exceptions?
def ignore_database_error(on_error_value=None):
def dec(func):
from functools import wraps
Expand All @@ -30,10 +30,8 @@ def inner(*a, **k):
return func(*a, **k)
except (OperationalError, ProgrammingError) as e:
warnings.warn(
"Got OperationalError: {}. "
"Please, check that you have migrated the database via python manage.py migrate".format(
e
),
f"Got OperationalError: {e}. Please, check that you have migrated the database via python "
f"manage.py migrate",
category=RuntimeWarning,
stacklevel=3,
)
Expand All @@ -49,35 +47,40 @@ def inner(*a, **k):
class DjangoJobStore(BaseJobStore):
"""
Stores jobs in a Django database.
:param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
highest available
highest available
"""

def __init__(self, pickle_protocol=pickle.HIGHEST_PROTOCOL):
def __init__(self, pickle_protocol: int = pickle.HIGHEST_PROTOCOL):
super().__init__()
self.pickle_protocol = pickle_protocol

@ignore_database_error()
def lookup_job(self, job_id):
logger.debug("Lookup for a job %s", job_id)
def lookup_job(self, job_id: str) -> Union[None, Job]:
logger.debug(f"Lookup for job '{job_id}'...")
try:
job_state = DjangoJob.objects.get(name=job_id).job_state
r = self._reconstitute_job(job_state) if job_state else None
logger.debug(f"Found job: {r}")

return r

except DjangoJob.DoesNotExist:
return None
r = self._reconstitute_job(job_state) if job_state else None
logger.debug("Got %s", r)
return r

# TODO: Remove this (unused?) method?
@ignore_database_error(on_error_value=[])
def get_due_jobs(self, now):
logger.debug("get_due_jobs for time=%s", now)
def get_due_jobs(self, now) -> List[Job]:
logger.debug(f"get_due_jobs for time={now}...")
try:
out = self._get_jobs(next_run_time__lte=serialize_dt(now))
logger.debug("Got %s", out)
return out
jobs = self._get_jobs(next_run_time__lte=serialize_dt(now))
logger.debug(f"Found job: {jobs}")

return jobs
# TODO: Make this except clause more specific
except Exception:
logger.exception("Exception during getting jobs")
logger.exception("Exception during 'get_due_jobs'")
return []

@ignore_database_error()
Expand All @@ -88,21 +91,26 @@ def get_next_run_time(self):
.earliest("next_run_time")
.next_run_time
)
except ObjectDoesNotExist: # no active jobs
return
except DjangoJob.DoesNotExist:
# No active jobs - OK
pass

# TODO: Make this except clause more specific
except Exception:
logger.exception("Exception during get_next_run_time for jobs")

return None

@ignore_database_error(on_error_value=[])
def get_all_jobs(self):
jobs = self._get_jobs()
self._fix_paused_jobs_sorting(jobs)

return jobs

@ignore_database_error()
def add_job(self, job):
dbJob, created = DjangoJob.objects.get_or_create(
def add_job(self, job: Job):
db_job, created = DjangoJob.objects.get_or_create(
defaults=dict(
next_run_time=serialize_dt(job.next_run_time),
job_state=pickle.dumps(job.__getstate__(), self.pickle_protocol),
Expand All @@ -112,39 +120,36 @@ def add_job(self, job):

if not created:
logger.warning(
"Job with id %s already in jobstore. I'll refresh it", job.id
f"Job with id '{job.id}' already in jobstore! I'll refresh it."
)
dbJob.next_run_time = serialize_dt(job.next_run_time)
dbJob.job_state = pickle.dumps(job.__getstate__(), self.pickle_protocol)
dbJob.save()
db_job.next_run_time = serialize_dt(job.next_run_time)
db_job.job_state = pickle.dumps(job.__getstate__(), self.pickle_protocol)
db_job.save()

@ignore_database_error()
def update_job(self, job):
def update_job(self, job: Job):
updated = DjangoJob.objects.filter(name=job.id).update(
next_run_time=serialize_dt(job.next_run_time),
job_state=pickle.dumps(job.__getstate__(), self.pickle_protocol),
)

logger.debug(
"Update job %s: next_run_time=%s, job_state=%s",
job,
serialize_dt(job.next_run_time),
job.__getstate__(),
f"Update job '{job}': next_run_time={serialize_dt(job.next_run_time)}, job_state={job.__getstate__()}",
)

if updated == 0:
logger.info("Job with id %s not found", job.id)
logger.info(f"Job with id '{job.id}' not found")
raise JobLookupError(job.id)

@ignore_database_error()
def remove_job(self, job_id):
qs = DjangoJob.objects.filter(name=job_id)
if not qs.exists():
logger.warning("Job with id %s not found. Can't remove job.", job_id)
qs.delete()
def remove_job(self, job_id: str):
_, num_deleted = DjangoJob.objects.filter(name=job_id).delete()
if num_deleted == 0:
logger.warning(f"Job with id '{job_id}' not found. Cannot remove job.")

@ignore_database_error()
def remove_all_jobs(self):
# TODO: Replace raw SQL below with Django ORM equivalents?
with connections["default"].cursor() as c:
c.execute(
"""
Expand All @@ -156,29 +161,32 @@ def remove_all_jobs(self):
def _reconstitute_job(self, job_state):
job_state = pickle.loads(job_state)
job_state["jobstore"] = self

job = Job.__new__(Job)
job.__setstate__(job_state)
job._scheduler = self._scheduler
job._jobstore_alias = self._alias

return job

def _get_jobs(self, **filters):
job_states = DjangoJob.objects.filter(**filters).values_list("id", "job_state")
jobs = []
failed_job_ids = set()

for job_id, job_state in job_states:
try:
jobs.append(self._reconstitute_job(job_state))
# TODO: Make this except clause more specific
except Exception:
self._logger.exception(
'Unable to restore job "%s" -- removing it', job_id
f"Unable to restore job '{job_id}'. Removing it..."
)
failed_job_ids.add(job_id)

# Remove all the jobs we failed to restore
if failed_job_ids:
logger.warning("Remove bad jobs: %s", failed_job_ids)
logger.warning(f"Removing bad jobs: {failed_job_ids}")
DjangoJob.objects.filter(id__in=failed_job_ids).delete()

def map_jobs(job):
Expand All @@ -188,50 +196,42 @@ def map_jobs(job):
return list(map(map_jobs, jobs))


def event_name(code):
for key in dir(events):
if getattr(events, key) == code:
return key


class _EventManager:

logger = logger.getChild("events")

def __init__(self, storage=None):
self.storage = storage or DjangoResultStorage()

def __call__(self, event):
logger.debug("Got event: %s, %s, %s", event, type(event), event.__dict__)
# print event, type(event), event.__dict__
def __call__(self, event: JobSubmissionEvent):
logger.debug(f"Received event: {event}, {type(event)}, {event.__dict__}")
try:
if isinstance(event, JobSubmissionEvent):
self._process_submission_event(event)

elif isinstance(event, JobExecutionEvent):
self._process_execution_event(event)

except Exception as e:
self.logger.exception(str(e))

@ignore_database_error()
def _process_submission_event(self, event):
# type: (JobSubmissionEvent)->None

def _process_submission_event(self, event: JobSubmissionEvent):
try:
job = DjangoJob.objects.get(name=event.job_id)
except ObjectDoesNotExist:
self.logger.warning("Job with id %s not found in database", event.job_id)
except DjangoJob.DoesNotExist:
self.logger.warning(f"Job with id '{event.job_id}' not found in database!")
return

self.storage.get_or_create_job_execution(job, event)

@ignore_database_error()
def _process_execution_event(self, event):
# type: (JobExecutionEvent)->None
def _process_execution_event(self, event: JobExecutionEvent):

try:
job = DjangoJob.objects.get(name=event.job_id)
except ObjectDoesNotExist:
self.logger.warning("Job with id %s not found in database", event.job_id)
except DjangoJob.DoesNotExist:
self.logger.warning(f"Job with id '{event.job_id}' not found in database")
return

self.storage.register_job_executed(job, event)
Expand All @@ -241,11 +241,10 @@ def register_events(scheduler, result_storage=None):
scheduler.add_listener(_EventManager(result_storage))


def register_job(scheduler, *a, **k):
# type: (BaseScheduler)->callable
def register_job(scheduler: BaseScheduler, *a, **k) -> callable:
"""
Helper decorator for job registration.
Automatically fills id parameter to prevent jobs duplication.
See this comment for explanation: https://github.com/jarekwg/django-apscheduler/pull/9#issuecomment-342074372
Expand All @@ -265,6 +264,7 @@ def test_job():
def inner(func):
k.setdefault("id", f"{func.__module__}.{func.__name__}")
scheduler.add_job(func, *a, **k)

return func

return inner
Loading

0 comments on commit 56028e3

Please sign in to comment.