Skip to content

Commit

Permalink
Introduce reusable BackgroundJob framework
Browse files Browse the repository at this point in the history
A new abstract class can be used to implement job function classes. It
handles the necessary logic for starting and stopping jobs, including
exception handling and rescheduling of recurring jobs.

This commit also includes the migration of data source jobs to the new
framework.
  • Loading branch information
alehaa committed Jun 17, 2024
1 parent 388ba3d commit 5fab8e4
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 22 deletions.
37 changes: 18 additions & 19 deletions netbox/core/jobs.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,32 @@
import logging

from netbox.search.backends import search_backend
from .choices import *
from .exceptions import SyncError
from .models import DataSource
from rq.timeouts import JobTimeoutException
from utilities.jobs import BackgroundJob

logger = logging.getLogger(__name__)


def sync_datasource(job, *args, **kwargs):
class SyncDataSourceJob(BackgroundJob):
"""
Call sync() on a DataSource.
"""
datasource = DataSource.objects.get(pk=job.object_id)

try:
job.start()
datasource.sync()
@classmethod
def run(cls, job, *args, **kwargs):
from netbox.search.backends import search_backend
from .choices import DataSourceStatusChoices
from .exceptions import SyncError
from .models import DataSource

# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())
datasource = DataSource.objects.get(pk=job.object_id)

job.terminate()
try:
datasource.sync()

except Exception as e:
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) in (SyncError, JobTimeoutException):
logging.error(e)
else:
# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())

except Exception as e:
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) is SyncError:
logging.error(e)
raise e
5 changes: 2 additions & 3 deletions netbox/core/models/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from django.db import models
from django.urls import reverse
from django.utils import timezone
from django.utils.module_loading import import_string
from django.utils.translation import gettext as _

from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED
Expand Down Expand Up @@ -162,8 +161,8 @@ def enqueue_sync_job(self, request):
DataSource.objects.filter(pk=self.pk).update(status=self.status)

# Enqueue a sync job
return Job.enqueue(
import_string('core.jobs.sync_datasource'),
from ..jobs import SyncDataSourceJob
return SyncDataSourceJob.enqueue(
instance=self,
user=request.user
)
Expand Down
68 changes: 68 additions & 0 deletions netbox/utilities/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import logging
from abc import ABC, abstractmethod
from datetime import timedelta

from rq.timeouts import JobTimeoutException

from core.choices import JobStatusChoices
from core.models import Job


class BackgroundJob(ABC):
"""
Background Job helper class.
This class handles the execution of a background job. It is responsible for maintaining its state, reporting errors,
and scheduling recurring jobs.
"""

@classmethod
@abstractmethod
def run(cls, *args, **kwargs) -> None:
"""
Run the job.
A `BackgroundJob` class needs to implement this method to execute all commands of the job.
"""
pass

@classmethod
def handle(cls, job, *args, **kwargs):
"""
Handle the execution of a `BackgroundJob`.
This method is called by the Job Scheduler to handle the execution of all job commands. It will maintain the
job's metadata and handle errors. For periodic jobs, a new job is automatically scheduled using its `interval'.
"""
try:
job.start()
cls.run(job, *args, **kwargs)
job.terminate()

except Exception as e:
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
if type(e) is JobTimeoutException:
logging.error(e)

# If the executed job is a periodic job, schedule its next execution at the specified interval.
finally:
if job.interval:
new_scheduled_time = (job.scheduled or job.started) + timedelta(minutes=job.interval)
cls.enqueue(
instance=job.object,
name=job.name,
user=job.user,
schedule_at=new_scheduled_time,
interval=job.interval,
**kwargs,
)

@classmethod
def enqueue(cls, *args, **kwargs):
"""
Enqueue a new `BackgroundJob`.
This method is a wrapper of `Job.enqueue` using `handle()` as function callback. See its documentation for
parameters.
"""
return Job.enqueue(cls.handle, *args, **kwargs)

0 comments on commit 5fab8e4

Please sign in to comment.