Skip to content

Commit

Permalink
Closes #16971: Add system jobs (#17716)
Browse files Browse the repository at this point in the history
* Fix check for existing jobs

If a job is to be enqueued once and no specific scheduled time is
specified, any scheduled time of existing jobs will be valid. Only if a
specific scheduled time is specified for 'enqueue_once()' can it be
evaluated.

* Allow system jobs to be registered

A new registry key allows background system jobs to be registered and
automatically scheduled when rqworker starts.

* Test scheduling of system jobs

* Fix plugins scheduled job documentation

The documentation reflected a non-production state of the JobRunner
framework left over from development. Now a more practical example
demonstrates the usage.

* Allow plugins to register system jobs

* Rename system job metadata

To clarify which meta-attributes belong to system jobs, each of them is
now prefixed with 'system_'.

* Add predefined job interval choices

* Remove 'system_enabled' JobRunner attribute

Previously, the 'system_enabled' attribute was used to control whether a
job should run or not. However, this can also be accomplished by
evaluating the job's interval.

* Fix test

* Use a decorator to register system jobs

* Specify interval when registering system job

* Update documentation

---------

Co-authored-by: Jeremy Stretch <[email protected]>
  • Loading branch information
alehaa and jeremystretch authored Nov 1, 2024
1 parent 6dc75d8 commit 4bba926
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 14 deletions.
53 changes: 41 additions & 12 deletions docs/plugins/development/background-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class MyTestJob(JobRunner):

You can schedule the background job from within your code (e.g. from a model's `save()` method or a view) by calling `MyTestJob.enqueue()`. This method passes through all arguments to `Job.enqueue()`. However, no `name` argument must be passed, as the background job name will be used instead.

!!! tip
A set of predefined intervals is available at `core.choices.JobIntervalChoices` for convenience.

### Attributes

`JobRunner` attributes are defined under a class named `Meta` within the job. These are optional, but encouraged.
Expand All @@ -46,27 +49,53 @@ As described above, jobs can be scheduled for immediate execution or at any late

#### Example

```python title="jobs.py"
from netbox.jobs import JobRunner
```python title="models.py"
from django.db import models
from core.choices import JobIntervalChoices
from netbox.models import NetBoxModel
from .jobs import MyTestJob

class MyModel(NetBoxModel):
foo = models.CharField()

def save(self, *args, **kwargs):
MyTestJob.enqueue_once(instance=self, interval=JobIntervalChoices.INTERVAL_HOURLY)
return super().save(*args, **kwargs)

def sync(self):
MyTestJob.enqueue(instance=self)
```


### System Jobs

Some plugins may implement background jobs that are decoupled from the request/response cycle. Typical use cases would be housekeeping tasks or synchronization jobs. These can be registered as _system jobs_ using the `system_job()` decorator. The job interval must be passed as an integer (in minutes) when registering a system job. System jobs are scheduled automatically when the RQ worker (`manage.py rqworker`) is run.

#### Example

```python title="jobs.py"
from core.choices import JobIntervalChoices
from netbox.jobs import JobRunner, system_job
from .models import MyModel

# Specify a predefined choice or an integer indicating
# the number of minutes between job executions
@system_job(interval=JobIntervalChoices.INTERVAL_HOURLY)
class MyHousekeepingJob(JobRunner):
class Meta:
name = "Housekeeping"
name = "My Housekeeping Job"

def run(self, *args, **kwargs):
# your logic goes here
```

```python title="__init__.py"
from netbox.plugins import PluginConfig
MyModel.objects.filter(foo='bar').delete()

class MyPluginConfig(PluginConfig):
def ready(self):
from .jobs import MyHousekeepingJob
MyHousekeepingJob.setup(interval=60)
system_jobs = (
MyHousekeepingJob,
)
```

!!! note
Ensure that any system jobs are imported on initialization. Otherwise, they won't be registered. This can be achieved by extending the PluginConfig's `ready()` method.

## Task queues

Three task queues of differing priority are defined by default:
Expand Down
2 changes: 1 addition & 1 deletion docs/plugins/development/data-backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ backends = [MyDataBackend]
```

!!! tip
The path to the list of search indexes can be modified by setting `data_backends` in the PluginConfig instance.
The path to the list of data backends can be modified by setting `data_backends` in the PluginConfig instance.

::: netbox.data_backends.DataBackend
14 changes: 14 additions & 0 deletions netbox/core/choices.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@ class JobStatusChoices(ChoiceSet):
)


class JobIntervalChoices(ChoiceSet):
INTERVAL_MINUTELY = 1
INTERVAL_HOURLY = 60
INTERVAL_DAILY = 60 * 24
INTERVAL_WEEKLY = 60 * 24 * 7

CHOICES = (
(INTERVAL_MINUTELY, _('Minutely')),
(INTERVAL_HOURLY, _('Hourly')),
(INTERVAL_DAILY, _('Daily')),
(INTERVAL_WEEKLY, _('Weekly')),
)


#
# ObjectChanges
#
Expand Down
11 changes: 11 additions & 0 deletions netbox/core/management/commands/rqworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from django_rq.management.commands.rqworker import Command as _Command

from netbox.registry import registry


DEFAULT_QUEUES = ('high', 'default', 'low')

Expand All @@ -14,6 +16,15 @@ class Command(_Command):
of only the 'default' queue).
"""
def handle(self, *args, **options):
# Setup system jobs.
for job, kwargs in registry['system_jobs'].items():
try:
interval = kwargs['interval']
except KeyError:
raise TypeError("System job must specify an interval (in minutes).")
logger.debug(f"Scheduling system job {job.name} (interval={interval})")
job.enqueue_once(**kwargs)

# Run the worker with scheduler functionality
options['with_scheduler'] = True

Expand Down
21 changes: 20 additions & 1 deletion netbox/netbox/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,38 @@
from abc import ABC, abstractmethod
from datetime import timedelta

from django.core.exceptions import ImproperlyConfigured
from django.utils.functional import classproperty
from django_pglocks import advisory_lock
from rq.timeouts import JobTimeoutException

from core.choices import JobStatusChoices
from core.models import Job, ObjectType
from netbox.constants import ADVISORY_LOCK_KEYS
from netbox.registry import registry

__all__ = (
'JobRunner',
'system_job',
)


def system_job(interval):
"""
Decorator for registering a `JobRunner` class as system background job.
"""
if type(interval) is not int:
raise ImproperlyConfigured("System job interval must be an integer (minutes).")

def _wrapper(cls):
registry['system_jobs'][cls] = {
'interval': interval
}
return cls

return _wrapper


class JobRunner(ABC):
"""
Background Job helper class.
Expand Down Expand Up @@ -129,7 +148,7 @@ class scheduled for `instance`, the existing job will be updated if necessary. T
if job:
# If the job parameters haven't changed, don't schedule a new job and keep the current schedule. Otherwise,
# delete the existing job and schedule a new job instead.
if (schedule_at and job.scheduled == schedule_at) and (job.interval == interval):
if (not schedule_at or job.scheduled == schedule_at) and (job.interval == interval):
return job
job.delete()

Expand Down
1 change: 1 addition & 0 deletions netbox/netbox/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __delitem__(self, key):
'models': collections.defaultdict(set),
'plugins': dict(),
'search': dict(),
'system_jobs': dict(),
'tables': collections.defaultdict(dict),
'views': collections.defaultdict(dict),
'widgets': dict(),
Expand Down
5 changes: 5 additions & 0 deletions netbox/netbox/tests/dummy_plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,10 @@ class DummyPluginConfig(PluginConfig):
'netbox.tests.dummy_plugin.events.process_events_queue'
]

def ready(self):
super().ready()

from . import jobs # noqa: F401


config = DummyPluginConfig
9 changes: 9 additions & 0 deletions netbox/netbox/tests/dummy_plugin/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from core.choices import JobIntervalChoices
from netbox.jobs import JobRunner, system_job


@system_job(interval=JobIntervalChoices.INTERVAL_HOURLY)
class DummySystemJob(JobRunner):

def run(self, *args, **kwargs):
pass
36 changes: 36 additions & 0 deletions netbox/netbox/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ def test_enqueue_once_twice_same(self):
self.assertEqual(job1, job2)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)

def test_enqueue_once_twice_same_no_schedule_at(self):
instance = DataSource()
schedule_at = self.get_schedule_at()
job1 = TestJobRunner.enqueue_once(instance, schedule_at=schedule_at)
job2 = TestJobRunner.enqueue_once(instance)

self.assertEqual(job1, job2)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)

def test_enqueue_once_twice_different_schedule_at(self):
instance = DataSource()
job1 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at())
Expand Down Expand Up @@ -127,3 +136,30 @@ def test_enqueue_once_after_enqueue(self):
self.assertNotEqual(job1, job2)
self.assertRaises(Job.DoesNotExist, job1.refresh_from_db)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)


class SystemJobTest(JobRunnerTestCase):
"""
Test that system jobs can be scheduled.
General functionality already tested by `JobRunnerTest` and `EnqueueTest`.
"""

def test_scheduling(self):
# Can job be enqueued?
job = TestJobRunner.enqueue(schedule_at=self.get_schedule_at())
self.assertIsInstance(job, Job)
self.assertEqual(TestJobRunner.get_jobs().count(), 1)

# Can job be deleted again?
job.delete()
self.assertRaises(Job.DoesNotExist, job.refresh_from_db)
self.assertEqual(TestJobRunner.get_jobs().count(), 0)

def test_enqueue_once(self):
schedule_at = self.get_schedule_at()
job1 = TestJobRunner.enqueue_once(schedule_at=schedule_at)
job2 = TestJobRunner.enqueue_once(schedule_at=schedule_at)

self.assertEqual(job1, job2)
self.assertEqual(TestJobRunner.get_jobs().count(), 1)
9 changes: 9 additions & 0 deletions netbox/netbox/tests/test_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from django.test import Client, TestCase, override_settings
from django.urls import reverse

from core.choices import JobIntervalChoices
from netbox.tests.dummy_plugin import config as dummy_config
from netbox.tests.dummy_plugin.data_backends import DummyBackend
from netbox.tests.dummy_plugin.jobs import DummySystemJob
from netbox.plugins.navigation import PluginMenu
from netbox.plugins.utils import get_plugin_config
from netbox.graphql.schema import Query
Expand Down Expand Up @@ -130,6 +132,13 @@ def test_data_backends(self):
self.assertIn('dummy', registry['data_backends'])
self.assertIs(registry['data_backends']['dummy'], DummyBackend)

def test_system_jobs(self):
"""
Check registered system jobs.
"""
self.assertIn(DummySystemJob, registry['system_jobs'])
self.assertEqual(registry['system_jobs'][DummySystemJob]['interval'], JobIntervalChoices.INTERVAL_HOURLY)

def test_queues(self):
"""
Check that plugin queues are registered with the accurate name.
Expand Down

0 comments on commit 4bba926

Please sign in to comment.