Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

15692: Introduce background jobs #16927

Merged
merged 38 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
5fab8e4
Introduce reusable BackgroundJob framework
alehaa Jun 17, 2024
957bc3d
Restore using import_string for jobs
alehaa Jun 20, 2024
db591d4
Use SyncDataSourceJob for management command
alehaa Jun 20, 2024
53a4420
Implement BackgroundJob for running scripts
alehaa Jun 21, 2024
7fb1875
Fix documentation of model features
alehaa Jun 30, 2024
212262d
Ensure consitent code style
alehaa Jun 30, 2024
9dc6099
Introduce reusable ScheduledJob
alehaa Jul 1, 2024
4880d81
Introduce reusable SystemJob
alehaa Jul 1, 2024
d78ddfc
Add documentation for jobs framework
alehaa Jul 1, 2024
fd8d537
Merge branch 'feature' into 15692-background-jobs
alehaa Jul 16, 2024
15f888c
Revert "Use SyncDataSourceJob for management"
alehaa Jul 24, 2024
7d15ec0
Merge enqueued status into JobStatusChoices
alehaa Jul 24, 2024
9f1989c
Fix logger for ScriptJob
alehaa Jul 24, 2024
257976d
Remove job name for scripts
alehaa Jul 24, 2024
58089c7
Merge ScheduledJob into BackgroundJob
alehaa Jul 24, 2024
fb75389
Add name attribute for BackgroundJob
alehaa Jul 24, 2024
654e6e7
Drop enqueue_sync_job() method from DataSource
jeremystretch Jul 24, 2024
62380fb
Import ScriptJob directly
jeremystretch Jul 24, 2024
d6432fb
Relax requirement for Jobs to reference a specific object
jeremystretch Jul 24, 2024
b3f122a
Rename 'run_now' arg on Job.enqueue() to 'immediate'
jeremystretch Jul 24, 2024
3e1cc1b
Merge branch '15692-cherry' into 15692-background-jobs
alehaa Jul 25, 2024
bcad8cf
Fix queue lookup in Job enqueue
alehaa Jul 25, 2024
0b15ecf
Collapse SystemJob into BackgroundJob
alehaa Jul 25, 2024
309ad29
Remove legacy JobResultStatusChoices
alehaa Jul 25, 2024
b17b205
Use queue 'low' for system jobs by default
alehaa Jul 25, 2024
60e4e81
Add test cases for BackgroundJob handling
alehaa Jul 25, 2024
bd4a21c
Fix enqueue interval jobs
alehaa Jul 25, 2024
4c2ba09
Honor schedule_at for job's enqueue_once
alehaa Jul 25, 2024
c047bf4
Switch BackgroundJob to regular methods
alehaa Jul 25, 2024
e65e87c
Fix background tasks documentation
alehaa Jul 25, 2024
3fc3d37
Test enqueue in combination with enqueue_once
alehaa Jul 25, 2024
cecc2b8
Rename background jobs to tasks (to differentiate from RQ)
jeremystretch Jul 29, 2024
c098d1c
Touch up docs
jeremystretch Jul 29, 2024
b9cf078
Revert "Use queue 'low' for system jobs by default"
alehaa Jul 29, 2024
32ebe7b
Remove system background job
alehaa Jul 30, 2024
ecf8e79
Fix runscript management command
alehaa Jul 30, 2024
7f0a4e3
Use regular imports for ScriptJob
alehaa Jul 30, 2024
85b9f65
Rename BackgroundJob to JobRunner
jeremystretch Jul 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/development/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Depending on its classification, each NetBox model may support various features
| [Custom links](../customization/custom-links.md) | `CustomLinksMixin` | `custom_links` | These models support the assignment of custom links |
| [Custom validation](../customization/custom-validation.md) | `CustomValidationMixin` | - | Supports the enforcement of custom validation rules |
| [Export templates](../customization/export-templates.md) | `ExportTemplatesMixin` | `export_templates` | Users can create custom export templates for these models |
| [Job results](../features/background-jobs.md) | `JobsMixin` | `jobs` | Users can create custom export templates for these models |
| [Job results](../features/background-jobs.md) | `JobsMixin` | `jobs` | Background jobs can be scheduled for these models |
| [Journaling](../features/journaling.md) | `JournalingMixin` | `journaling` | These models support persistent historical commentary |
| [Synchronized data](../integrations/synchronized-data.md) | `SyncedDataMixin` | `synced_data` | Certain model data can be automatically synchronized from a remote data source |
| [Tagging](../models/extras/tag.md) | `TagsMixin` | `tags` | The models can be tagged with user-defined tags |
Expand Down
103 changes: 103 additions & 0 deletions docs/plugins/development/background-jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Background Jobs

NetBox plugins can defer certain operations by enqueuing [background jobs](../../features/background-jobs.md), which are executed asynchronously by background workers. This is helpful for decoupling long-running processes from the user-facing request-response cycle.

For example, your plugin might need to fetch data from a remote system. Depending on the amount of data and the responsiveness of the remote server, this could take a few minutes. Deferring this task to a background job ensures that it can be completed in the background, without interrupting the user. The data it fetches can be made available once the job has completed.

## Background Job

A background job implements a basic [Job](../../models/core/job.md) executor for all kinds of tasks. It has logic implemented to handle the management of the associated job object, rescheduling of periodic jobs in the given interval and error handling. Adding custom jobs is done by subclassing NetBox's `BackgroundJob` class.

::: utilities.jobs.BackgroundJob

#### Example

```python title="jobs.py"
from utilities.jobs import BackgroundJob

class MyTestJob(BackgroundJob):
class Meta:
name = "My Test Job"

def run(self, *args, **kwargs):
obj = self.job.object
# your logic goes here
```

You can schedule the background job from within your code (e.g. from a model's `save()` method or a view) by calling `MyTestJob.enqueue()`. This method passes through all arguments to `Job.enqueue()`. However, no `name` argument must be passed, as the background job name will be used instead.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be good to give an example here of calling enqueue


### Job Attributes

Background job attributes are defined under a class named `Meta` within the job. These are optional, but encouraged.

#### `name`

This is the human-friendly names of your background job. If omitted, the class name will be used.

### Scheduled Jobs

As described above, jobs can be scheduled for immediate execution or at any later time using the `enqueue()` method. However, for management purposes, the `enqueue_once()` method allows a job to be scheduled exactly once avoiding duplicates. If a job is already scheduled for a particular instance, a second one won't be scheduled, respecting thread safety. An example use case would be to schedule a periodic task that is bound to an instance in general, but not to any event of that instance (such as updates). The parameters of the `enqueue_once()` method are identical to those of `enqueue()`.

!!! tip
It is not forbidden to `enqueue()` additional jobs while an interval schedule is active. An example use of this would be to schedule a periodic daily synchronization, but also trigger additional synchronizations on demand when the user presses a button.

### System Jobs

A system background job is not bound to any particular NetBox object. A typical use case for these jobs is a general synchronization of NetBox objects from another system or housekeeping.

The `setup()` method can be used to set up a new scheduled job outside the request-response cycle. It can be safely called from the plugin's ready function and will register the new schedule right after all plugins are loaded and the database is connected.

!!! note
Unless otherwise configured, system background jobs use the `default` queue for scheduling. This can be changed using the [`QUEUE_MAPPINGS`](../../configuration/miscellaneous.md#queue_mappings) setting when using `None` as model.

#### Example

```python title="jobs.py"
from utilities.jobs import BackgroundJob

class MyHousekeepingJob(BackgroundJob):
class Meta:
name = "Housekeeping"

def run(self, *args, **kwargs):
# your logic goes here
```
```python title="__init__.py"
from netbox.plugins import PluginConfig

class MyPluginConfig(PluginConfig):
def ready(self):
from .jobs import MyHousekeepingJob
MyHousekeepingJob.setup(interval=60)
```

## Task queues

Three task queues of differing priority are defined by default:

* High
* Default
* Low

Any tasks in the "high" queue are completed before the default queue is checked, and any tasks in the default queue are completed before those in the "low" queue.

Plugins can also add custom queues for their own needs by setting the `queues` attribute under the PluginConfig class. An example is included below:

```python
class MyPluginConfig(PluginConfig):
name = 'myplugin'
...
queues = [
'foo',
'bar',
]
```

The `PluginConfig` above creates two custom queues with the following names `my_plugin.foo` and `my_plugin.bar`. (The plugin's name is prepended to each queue to avoid conflicts between plugins.)

!!! warning "Configuring the RQ worker process"
By default, NetBox's RQ worker process only services the high, default, and low queues. Plugins which introduce custom queues should advise users to either reconfigure the default worker, or run a dedicated worker specifying the necessary queues. For example:

```
python manage.py rqworker my_plugin.foo my_plugin.bar
```
30 changes: 0 additions & 30 deletions docs/plugins/development/background-tasks.md

This file was deleted.

1 change: 1 addition & 0 deletions docs/plugins/development/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ project-name/
- __init__.py
- filtersets.py
- graphql.py
- jobs.py
- models.py
- middleware.py
- navigation.py
Expand Down
2 changes: 2 additions & 0 deletions docs/plugins/development/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ For more information about database migrations, see the [Django documentation](h

::: netbox.models.features.ExportTemplatesMixin

::: netbox.models.features.JobsMixin

::: netbox.models.features.JournalingMixin

::: netbox.models.features.TagsMixin
Expand Down
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ nav:
- Data Backends: 'plugins/development/data-backends.md'
- REST API: 'plugins/development/rest-api.md'
- GraphQL API: 'plugins/development/graphql-api.md'
- Background Tasks: 'plugins/development/background-tasks.md'
- Background Jobs: 'plugins/development/background-jobs.md'
- Dashboard Widgets: 'plugins/development/dashboard-widgets.md'
- Staged Changes: 'plugins/development/staged-changes.md'
- Exceptions: 'plugins/development/exceptions.md'
Expand Down
8 changes: 7 additions & 1 deletion netbox/core/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from rest_framework.viewsets import ReadOnlyModelViewSet

from core import filtersets
from core.choices import DataSourceStatusChoices
from core.jobs import SyncDataSourceJob
from core.models import *
from netbox.api.metadata import ContentTypeMetadata
from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet
Expand Down Expand Up @@ -36,7 +38,11 @@ def sync(self, request, pk):
if not request.user.has_perm('core.sync_datasource', obj=datasource):
raise PermissionDenied(_("This user does not have permission to synchronize this data source."))

datasource.enqueue_sync_job(request)
# Enqueue the sync job & update the DataSource's status
SyncDataSourceJob.enqueue(instance=datasource, user=request.user)
datasource.status = DataSourceStatusChoices.QUEUED
DataSource.objects.filter(pk=datasource.pk).update(status=datasource.status)

serializer = serializers.DataSourceSerializer(datasource, context={'request': request})

return Response(serializer.data)
Expand Down
6 changes: 6 additions & 0 deletions netbox/core/choices.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ class JobStatusChoices(ChoiceSet):
(STATUS_FAILED, _('Failed'), 'red'),
)

ENQUEUED_STATE_CHOICES = (
STATUS_PENDING,
STATUS_SCHEDULED,
STATUS_RUNNING,
)

TERMINAL_STATE_CHOICES = (
STATUS_COMPLETED,
STATUS_ERRORED,
Expand Down
32 changes: 16 additions & 16 deletions netbox/core/jobs.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
import logging

from netbox.search.backends import search_backend
from .choices import *
from utilities.jobs import BackgroundJob
from .choices import DataSourceStatusChoices
from .exceptions import SyncError
from .models import DataSource
from rq.timeouts import JobTimeoutException

logger = logging.getLogger(__name__)


def sync_datasource(job, *args, **kwargs):
class SyncDataSourceJob(BackgroundJob):
"""
Call sync() on a DataSource.
"""
datasource = DataSource.objects.get(pk=job.object_id)

try:
job.start()
datasource.sync()
class Meta:
name = 'Synchronization'

# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())
def run(self, *args, **kwargs):
datasource = DataSource.objects.get(pk=self.job.object_id)

job.terminate()
try:
datasource.sync()

except Exception as e:
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) in (SyncError, JobTimeoutException):
logging.error(e)
else:
# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())

except Exception as e:
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) is SyncError:
logging.error(e)
raise e
24 changes: 24 additions & 0 deletions netbox/core/migrations/0012_job_object_type_optional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('contenttypes', '0002_remove_content_type_name'),
('core', '0011_move_objectchange'),
]

operations = [
migrations.AlterField(
model_name='job',
name='object_type',
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name='jobs',
to='contenttypes.contenttype'
),
),
]
19 changes: 1 addition & 18 deletions netbox/core/models/data.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import hashlib
import logging
import os
import yaml
from fnmatch import fnmatchcase
from urllib.parse import urlparse

import yaml
jeremystretch marked this conversation as resolved.
Show resolved Hide resolved
from django.conf import settings
from django.contrib.contenttypes.fields import GenericForeignKey
from django.core.exceptions import ValidationError
from django.core.validators import RegexValidator
from django.db import models
from django.urls import reverse
from django.utils import timezone
from django.utils.module_loading import import_string
from django.utils.translation import gettext as _

from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED
Expand All @@ -23,7 +22,6 @@
from ..choices import *
from ..exceptions import SyncError
from ..signals import post_sync, pre_sync
from .jobs import Job

__all__ = (
'AutoSyncRecord',
Expand Down Expand Up @@ -153,21 +151,6 @@ def to_objectchange(self, action):

return objectchange

def enqueue_sync_job(self, request):
"""
Enqueue a background job to synchronize the DataSource by calling sync().
"""
# Set the status to "syncing"
self.status = DataSourceStatusChoices.QUEUED
DataSource.objects.filter(pk=self.pk).update(status=self.status)

# Enqueue a sync job
return Job.enqueue(
import_string('core.jobs.sync_datasource'),
instance=self,
user=request.user
)

def get_backend(self):
backend_params = self.parameters or {}
return self.backend_class(self.source_url, **backend_params)
Expand Down
31 changes: 25 additions & 6 deletions netbox/core/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Job(models.Model):
to='contenttypes.ContentType',
related_name='jobs',
on_delete=models.CASCADE,
blank=True,
null=True
)
object_id = models.PositiveBigIntegerField(
blank=True,
Expand Down Expand Up @@ -197,25 +199,34 @@ def terminate(self, status=JobStatusChoices.STATUS_COMPLETED, error=None):
job_end.send(self)

@classmethod
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs):
def enqueue(cls, func, instance=None, name='', user=None, schedule_at=None, interval=None, immediate=False, **kwargs):
alehaa marked this conversation as resolved.
Show resolved Hide resolved
"""
Create a Job instance and enqueue a job using the given callable

Args:
func: The callable object to be enqueued for execution
instance: The NetBox object to which this job pertains
instance: The NetBox object to which this job pertains (optional)
name: Name for the job (optional)
user: The user responsible for running the job
schedule_at: Schedule the job to be executed at the passed date and time
interval: Recurrence interval (in minutes)
immediate: Run the job immediately without scheduling it in the background. Should be used for interactive
management commands only.
"""
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
rq_queue_name = get_queue_for_model(object_type.model)
if schedule_at and immediate:
raise ValueError("enqueue() cannot be called with values for both schedule_at and immediate.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be internationalized with _(


if instance:
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
object_id = instance.pk
else:
object_type = object_id = None
rq_queue_name = get_queue_for_model(object_type.model if object_type else None)
queue = django_rq.get_queue(rq_queue_name)
status = JobStatusChoices.STATUS_SCHEDULED if schedule_at else JobStatusChoices.STATUS_PENDING
job = Job.objects.create(
object_type=object_type,
object_id=instance.pk,
object_id=object_id,
name=name,
status=status,
scheduled=schedule_at,
Expand All @@ -224,8 +235,16 @@ def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=
job_id=uuid.uuid4()
)

if schedule_at:
# Run the job immediately, rather than enqueuing it as a background task. Note that this is a synchronous
# (blocking) operation, and execution will pause until the job completes.
if immediate:
func(job_id=str(job.job_id), job=job, **kwargs)

# Schedule the job to run at a specific date & time.
elif schedule_at:
queue.enqueue_at(schedule_at, func, job_id=str(job.job_id), job=job, **kwargs)

# Schedule the job to run asynchronously at this first available opportunity.
else:
queue.enqueue(func, job_id=str(job.job_id), job=job, **kwargs)

Expand Down
Loading