diff --git a/netbox/core/jobs.py b/netbox/core/jobs.py index 264313e6204..8f7ddd0d8d4 100644 --- a/netbox/core/jobs.py +++ b/netbox/core/jobs.py @@ -1,33 +1,32 @@ import logging -from netbox.search.backends import search_backend -from .choices import * -from .exceptions import SyncError -from .models import DataSource -from rq.timeouts import JobTimeoutException +from utilities.jobs import BackgroundJob logger = logging.getLogger(__name__) -def sync_datasource(job, *args, **kwargs): +class SyncDataSourceJob(BackgroundJob): """ Call sync() on a DataSource. """ - datasource = DataSource.objects.get(pk=job.object_id) - try: - job.start() - datasource.sync() + @classmethod + def run(cls, job, *args, **kwargs): + from netbox.search.backends import search_backend + from .choices import DataSourceStatusChoices + from .exceptions import SyncError + from .models import DataSource - # Update the search cache for DataFiles belonging to this source - search_backend.cache(datasource.datafiles.iterator()) + datasource = DataSource.objects.get(pk=job.object_id) - job.terminate() + try: + datasource.sync() - except Exception as e: - job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e)) - DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED) - if type(e) in (SyncError, JobTimeoutException): - logging.error(e) - else: + # Update the search cache for DataFiles belonging to this source + search_backend.cache(datasource.datafiles.iterator()) + + except Exception as e: + DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED) + if type(e) is SyncError: + logging.error(e) raise e diff --git a/netbox/core/models/data.py b/netbox/core/models/data.py index 48fa2ff7130..a354edfc4c5 100644 --- a/netbox/core/models/data.py +++ b/netbox/core/models/data.py @@ -12,7 +12,6 @@ from django.db import models from django.urls import reverse from django.utils import timezone -from django.utils.module_loading import import_string from django.utils.translation import gettext as _ from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED @@ -162,8 +161,8 @@ def enqueue_sync_job(self, request): DataSource.objects.filter(pk=self.pk).update(status=self.status) # Enqueue a sync job - return Job.enqueue( - import_string('core.jobs.sync_datasource'), + from ..jobs import SyncDataSourceJob + return SyncDataSourceJob.enqueue( instance=self, user=request.user ) diff --git a/netbox/utilities/jobs.py b/netbox/utilities/jobs.py new file mode 100644 index 00000000000..b7177488708 --- /dev/null +++ b/netbox/utilities/jobs.py @@ -0,0 +1,68 @@ +import logging +from abc import ABC, abstractmethod +from datetime import timedelta + +from rq.timeouts import JobTimeoutException + +from core.choices import JobStatusChoices +from core.models import Job + + +class BackgroundJob(ABC): + """ + Background Job helper class. + + This class handles the execution of a background job. It is responsible for maintaining its state, reporting errors, + and scheduling recurring jobs. + """ + + @classmethod + @abstractmethod + def run(cls, *args, **kwargs) -> None: + """ + Run the job. + + A `BackgroundJob` class needs to implement this method to execute all commands of the job. + """ + pass + + @classmethod + def handle(cls, job, *args, **kwargs): + """ + Handle the execution of a `BackgroundJob`. + + This method is called by the Job Scheduler to handle the execution of all job commands. It will maintain the + job's metadata and handle errors. For periodic jobs, a new job is automatically scheduled using its `interval'. + """ + try: + job.start() + cls.run(job, *args, **kwargs) + job.terminate() + + except Exception as e: + job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e)) + if type(e) is JobTimeoutException: + logging.error(e) + + # If the executed job is a periodic job, schedule its next execution at the specified interval. + finally: + if job.interval: + new_scheduled_time = (job.scheduled or job.started) + timedelta(minutes=job.interval) + cls.enqueue( + instance=job.object, + name=job.name, + user=job.user, + schedule_at=new_scheduled_time, + interval=job.interval, + **kwargs, + ) + + @classmethod + def enqueue(cls, *args, **kwargs): + """ + Enqueue a new `BackgroundJob`. + + This method is a wrapper of `Job.enqueue` using `handle()` as function callback. See its documentation for + parameters. + """ + return Job.enqueue(cls.handle, *args, **kwargs)