Skip to content

Commit

Permalink
Merge pull request #122 from jarekwg/fix/121_job_integrity_error
Browse files Browse the repository at this point in the history
Don't try to log job executions for jobs that no longer exist
  • Loading branch information
jcass77 authored Nov 1, 2020
2 parents 1d74bdb + 2931c64 commit 98c8347
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 64 deletions.
135 changes: 74 additions & 61 deletions django_apscheduler/jobstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,36 +41,43 @@ def handle_submission_event(cls, event: JobSubmissionEvent):
Create and return new job execution instance in the database when the job is submitted to the scheduler.
:param event: JobExecutionEvent instance
:return: DjangoJobExecution ID
:return: DjangoJobExecution ID or None if the job execution could not be logged.
"""
if event.code == events.EVENT_JOB_SUBMITTED:
# Start logging a new job execution
job_execution = DjangoJobExecution.atomic_update_or_create(
cls.lock,
event.job_id,
event.scheduled_run_times[0],
DjangoJobExecution.SENT,
)
elif event.code == events.EVENT_JOB_MAX_INSTANCES:
status = DjangoJobExecution.MAX_INSTANCES
try:
if event.code == events.EVENT_JOB_SUBMITTED:
# Start logging a new job execution
job_execution = DjangoJobExecution.atomic_update_or_create(
cls.lock,
event.job_id,
event.scheduled_run_times[0],
DjangoJobExecution.SENT,
)

exception = (
f"Execution of job '{event.job_id}' skipped: maximum number of running "
f"instances reached!"
)
elif event.code == events.EVENT_JOB_MAX_INSTANCES:
status = DjangoJobExecution.MAX_INSTANCES

job_execution = DjangoJobExecution.atomic_update_or_create(
cls.lock,
event.job_id,
event.scheduled_run_times[0],
status,
exception=exception,
)
else:
raise NotImplementedError(
f"Don't know how to handle JobSubmissionEvent '{event.code}'. Expected "
f"one of '{[events.EVENT_JOB_SUBMITTED, events.EVENT_JOB_MAX_INSTANCES]}'."
exception = (
f"Execution of job '{event.job_id}' skipped: maximum number of running "
f"instances reached!"
)

job_execution = DjangoJobExecution.atomic_update_or_create(
cls.lock,
event.job_id,
event.scheduled_run_times[0],
status,
exception=exception,
)
else:
raise NotImplementedError(
f"Don't know how to handle JobSubmissionEvent '{event.code}'. Expected "
f"one of '{[events.EVENT_JOB_SUBMITTED, events.EVENT_JOB_MAX_INSTANCES]}'."
)
except IntegrityError:
logger.warning(
f"Job '{event.job_id}' no longer exists! Skipping logging of job execution..."
)
return None

return job_execution.id

Expand Down Expand Up @@ -104,49 +111,55 @@ def handle_execution_event(cls, event: JobExecutionEvent) -> Union[int, None]:
return job_execution.id

@classmethod
def handle_error_event(cls, event: JobExecutionEvent) -> int:
def handle_error_event(cls, event: JobExecutionEvent) -> Union[int, None]:
"""
Store "failed" job execution status in the database.
:param event: JobExecutionEvent instance
:return: DjangoJobExecution ID
:return: DjangoJobExecution ID or None if the job execution could not be logged.
"""
if event.code == events.EVENT_JOB_ERROR:

if event.exception:
exception = str(event.exception)
traceback = str(event.traceback)
else:
exception = f"Job '{event.job_id}' raised an error!"
traceback = None

job_execution = DjangoJobExecution.atomic_update_or_create(
cls.lock,
event.job_id,
event.scheduled_run_time,
DjangoJobExecution.ERROR,
exception=exception,
traceback=traceback,
)

elif event.code == events.EVENT_JOB_MISSED:
# Job execution will not have been logged yet - do so now
status = DjangoJobExecution.MISSED
exception = f"Run time of job '{event.job_id}' was missed!"
try:
if event.code == events.EVENT_JOB_ERROR:

if event.exception:
exception = str(event.exception)
traceback = str(event.traceback)
else:
exception = f"Job '{event.job_id}' raised an error!"
traceback = None

job_execution = DjangoJobExecution.atomic_update_or_create(
cls.lock,
event.job_id,
event.scheduled_run_time,
DjangoJobExecution.ERROR,
exception=exception,
traceback=traceback,
)

job_execution = DjangoJobExecution.atomic_update_or_create(
cls.lock,
event.job_id,
event.scheduled_run_time,
status,
exception=exception,
)
elif event.code == events.EVENT_JOB_MISSED:
# Job execution will not have been logged yet - do so now
status = DjangoJobExecution.MISSED
exception = f"Run time of job '{event.job_id}' was missed!"

job_execution = DjangoJobExecution.atomic_update_or_create(
cls.lock,
event.job_id,
event.scheduled_run_time,
status,
exception=exception,
)

else:
raise NotImplementedError(
f"Don't know how to handle JobExecutionEvent '{event.code}'. Expected "
f"one of '{[events.EVENT_JOB_ERROR, events.EVENT_JOB_MAX_INSTANCES, events.EVENT_JOB_MISSED]}'."
else:
raise NotImplementedError(
f"Don't know how to handle JobExecutionEvent '{event.code}'. Expected "
f"one of '{[events.EVENT_JOB_ERROR, events.EVENT_JOB_MAX_INSTANCES, events.EVENT_JOB_MISSED]}'."
)
except IntegrityError:
logger.warning(
f"Job '{event.job_id}' no longer exists! Skipping logging of job execution..."
)
return None

return job_execution.id

Expand Down
4 changes: 4 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ This changelog is used to track all major changes to django-apscheduler.
- Update readme to clarify the need for ensuring that a single scheduler is run in your Django application until
APScheduler 4.0 arrives and django-apscheduler is migrated to make use of that version.
- Update authors section in `setup.py`.
- Don't try to log job executions for jobs that are no longer available in the job store. This was partially fixed
previously as part of [#116](https://github.com/jarekwg/django-apscheduler/issues/116), which only catered for
'execution' type of events. This fix resolves the issue for the remaining 'submitted' and 'error' events as well
(Fixes [#121](https://github.com/jarekwg/django-apscheduler/issues/121)).


## v0.5.0 (2020-10-13)
Expand Down
28 changes: 25 additions & 3 deletions tests/test_jobstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,28 @@ def test_handle_submission_event_not_supported_raises_exception(self, jobstore):

@pytest.mark.django_db
@pytest.mark.parametrize(
"event_code", [events.EVENT_JOB_SUBMITTED, events.EVENT_JOB_MAX_INSTANCES,],
"event_code", [events.EVENT_JOB_SUBMITTED, events.EVENT_JOB_MAX_INSTANCES, ],
)
def test_handle_submission_event_creates_job_execution(
self, event_code, jobstore, create_add_job
self, event_code, jobstore, create_add_job
):
job = create_add_job(jobstore, dummy_job, datetime(2016, 5, 3))
event = JobSubmissionEvent(event_code, job.id, jobstore, [timezone.now()])
jobstore.handle_submission_event(event)

assert DjangoJobExecution.objects.filter(job_id=event.job_id).exists()

@pytest.mark.django_db(transaction=True)
def test_handle_submission_event_for_job_that_no_longer_exists_does_not_raise_exception(
self, jobstore
):
event = JobSubmissionEvent(
events.EVENT_JOB_SUBMITTED, "finished_job", jobstore, [timezone.now()]
)
jobstore.handle_submission_event(event)

assert not DjangoJobExecution.objects.filter(job_id=event.job_id).exists()

@pytest.mark.django_db
def test_handle_execution_event_not_supported_raises_exception(self, jobstore):
event = JobExecutionEvent(
Expand Down Expand Up @@ -113,6 +124,17 @@ def test_handle_error_event_no_exception_sets_exception_text(

assert "raised an error!" in ex.exception

@pytest.mark.django_db(transaction=True)
def test_handle_error_event_for_job_that_no_longer_exists_does_not_raise_exception(
self, jobstore
):
event = JobExecutionEvent(
events.EVENT_JOB_ERROR, "finished_job", jobstore, timezone.now()
)
jobstore.handle_error_event(event)

assert not DjangoJobExecution.objects.filter(job_id=event.job_id).exists()

@pytest.mark.django_db
def test_register_event_listeners_registers_listeners(self, jobstore):
jobstore.register_event_listeners()
Expand Down Expand Up @@ -167,4 +189,4 @@ def test_register_job_raises_deprecation_warning(scheduler, jobstore):
register_job(scheduler, "interval", seconds=1)(dummy_job)
assert len(w) == 1
assert issubclass(w[-1].category, DeprecationWarning)
assert "deprecated" in str(w[-1].message)
assert "deprecated" in str(w[-1].message)

0 comments on commit 98c8347

Please sign in to comment.