Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

15692: Introduce background jobs #16927

Merged
merged 38 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
5fab8e4
Introduce reusable BackgroundJob framework
alehaa Jun 17, 2024
957bc3d
Restore using import_string for jobs
alehaa Jun 20, 2024
db591d4
Use SyncDataSourceJob for management command
alehaa Jun 20, 2024
53a4420
Implement BackgroundJob for running scripts
alehaa Jun 21, 2024
7fb1875
Fix documentation of model features
alehaa Jun 30, 2024
212262d
Ensure consitent code style
alehaa Jun 30, 2024
9dc6099
Introduce reusable ScheduledJob
alehaa Jul 1, 2024
4880d81
Introduce reusable SystemJob
alehaa Jul 1, 2024
d78ddfc
Add documentation for jobs framework
alehaa Jul 1, 2024
fd8d537
Merge branch 'feature' into 15692-background-jobs
alehaa Jul 16, 2024
15f888c
Revert "Use SyncDataSourceJob for management"
alehaa Jul 24, 2024
7d15ec0
Merge enqueued status into JobStatusChoices
alehaa Jul 24, 2024
9f1989c
Fix logger for ScriptJob
alehaa Jul 24, 2024
257976d
Remove job name for scripts
alehaa Jul 24, 2024
58089c7
Merge ScheduledJob into BackgroundJob
alehaa Jul 24, 2024
fb75389
Add name attribute for BackgroundJob
alehaa Jul 24, 2024
654e6e7
Drop enqueue_sync_job() method from DataSource
jeremystretch Jul 24, 2024
62380fb
Import ScriptJob directly
jeremystretch Jul 24, 2024
d6432fb
Relax requirement for Jobs to reference a specific object
jeremystretch Jul 24, 2024
b3f122a
Rename 'run_now' arg on Job.enqueue() to 'immediate'
jeremystretch Jul 24, 2024
3e1cc1b
Merge branch '15692-cherry' into 15692-background-jobs
alehaa Jul 25, 2024
bcad8cf
Fix queue lookup in Job enqueue
alehaa Jul 25, 2024
0b15ecf
Collapse SystemJob into BackgroundJob
alehaa Jul 25, 2024
309ad29
Remove legacy JobResultStatusChoices
alehaa Jul 25, 2024
b17b205
Use queue 'low' for system jobs by default
alehaa Jul 25, 2024
60e4e81
Add test cases for BackgroundJob handling
alehaa Jul 25, 2024
bd4a21c
Fix enqueue interval jobs
alehaa Jul 25, 2024
4c2ba09
Honor schedule_at for job's enqueue_once
alehaa Jul 25, 2024
c047bf4
Switch BackgroundJob to regular methods
alehaa Jul 25, 2024
e65e87c
Fix background tasks documentation
alehaa Jul 25, 2024
3fc3d37
Test enqueue in combination with enqueue_once
alehaa Jul 25, 2024
cecc2b8
Rename background jobs to tasks (to differentiate from RQ)
jeremystretch Jul 29, 2024
c098d1c
Touch up docs
jeremystretch Jul 29, 2024
b9cf078
Revert "Use queue 'low' for system jobs by default"
alehaa Jul 29, 2024
32ebe7b
Remove system background job
alehaa Jul 30, 2024
ecf8e79
Fix runscript management command
alehaa Jul 30, 2024
7f0a4e3
Use regular imports for ScriptJob
alehaa Jul 30, 2024
85b9f65
Rename BackgroundJob to JobRunner
jeremystretch Jul 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/development/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Depending on its classification, each NetBox model may support various features
| [Custom links](../customization/custom-links.md) | `CustomLinksMixin` | `custom_links` | These models support the assignment of custom links |
| [Custom validation](../customization/custom-validation.md) | `CustomValidationMixin` | - | Supports the enforcement of custom validation rules |
| [Export templates](../customization/export-templates.md) | `ExportTemplatesMixin` | `export_templates` | Users can create custom export templates for these models |
| [Job results](../features/background-jobs.md) | `JobsMixin` | `jobs` | Users can create custom export templates for these models |
| [Job results](../features/background-jobs.md) | `JobsMixin` | `jobs` | Background jobs can be scheduled for these models |
| [Journaling](../features/journaling.md) | `JournalingMixin` | `journaling` | These models support persistent historical commentary |
| [Synchronized data](../integrations/synchronized-data.md) | `SyncedDataMixin` | `synced_data` | Certain model data can be automatically synchronized from a remote data source |
| [Tagging](../models/extras/tag.md) | `TagsMixin` | `tags` | The models can be tagged with user-defined tags |
Expand Down
81 changes: 80 additions & 1 deletion docs/plugins/development/background-tasks.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,85 @@
# Background Tasks

NetBox supports the queuing of tasks that need to be performed in the background, decoupled from the request-response cycle, using the [Python RQ](https://python-rq.org/) library. Three task queues of differing priority are defined by default:
NetBox supports the queuing of tasks that need to be performed in the background, decoupled from the request-response cycle.

## High level API

NetBox provides an easy-to-use interface for programming and managing different types of jobs. In general, there are different types of jobs that can be used to perform any kind of background task. Due to inheritance, the general job logic remains the same, but each of them fulfills a specific task and has its own management logic around it.

### Background Job

A background job implements a basic [Job](../../models/core/job.md) executor for all kinds of tasks. It has logic implemented to handle the management of the associated job object, rescheduling of periodic jobs in the given interval and error handling. Adding custom jobs is done by subclassing NetBox's `BackgroundJob` class.

#### Example

```python title="jobs.py"
from utilities.jobs import BackgroundJob

class MyTestJob(BackgroundJob):
class Meta:
name = "My Test Job"

def run(self, *args, **kwargs):
obj = self.job.object
# your logic goes here
```

You can schedule the background job from within your code (e.g. from a model's `save()` method or a view) by calling `MyTestJob.enqueue()`. This method passes through all arguments to `Job.enqueue()`. However, no `name` argument must be passed, as the background job name will be used instead.

::: core.models.Job.enqueue

#### Job Attributes

Background job attributes are defined under a class named `Meta` within the job. These are optional, but encouraged.

##### `name`

This is the human-friendly names of your background job. If omitted, the class name will be used.

### Scheduled Jobs

As described above, jobs can be scheduled for immediate execution or at any later time using the `enqueue()` method. However, for management purposes, the `enqueue_once()` method allows a job to be scheduled exactly once avoiding duplicates. If a job is already scheduled for a particular instance, a second one won't be scheduled, respecting thread safety. An example use case would be to schedule a periodic task that is bound to an instance in general, but not to any event of that instance (such as updates). The parameters of the `enqueue_once()` method are identical to those of `enqueue()`.

!!! tip
It is not forbidden to `enqueue()` additional jobs while an interval schedule is active. An example use of this would be to schedule a periodic daily synchronization, but also trigger additional synchronizations on demand when the user presses a button.

### System Jobs

A system background job is not bound to any particular NetBox object. A typical use case for these jobs is a general synchronization of NetBox objects from another system or housekeeping.

The `setup()` method can be used to set up a new scheduled job outside the request-response cycle. It can be safely called from the plugin's ready function and will register the new schedule right after all plugins are loaded and the database is connected.

!!! note
The default system background job queue is `low`. It can be changed using the [`QUEUE_MAPPINGS`](../../configuration/miscellaneous.md#queue_mappings) setting when using `None` as model.

#### Example

```python title="jobs.py"
from utilities.jobs import BackgroundJob

class MyHousekeepingJob(BackgroundJob):
class Meta:
name = "Housekeeping"

def run(self, *args, **kwargs):
# your logic goes here
```
```python title="__init__.py"
from netbox.plugins import PluginConfig

class MyPluginConfig(PluginConfig):
def ready(self):
from .jobs import MyHousekeepingJob
MyHousekeepingJob.setup(interval=60)
```

## Low Level API

Instead of using the high-level APIs provided by NetBox, plugins may access the task scheduler directly using the [Python RQ](https://python-rq.org/) library. This allows scheduling background tasks without the need to add [Job](../../models/core/job.md) to the database or implementing custom job handling.

## Task queues

Three task queues of differing priority are defined by default:

* High
* Default
Expand Down
1 change: 1 addition & 0 deletions docs/plugins/development/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ project-name/
- __init__.py
- filtersets.py
- graphql.py
- jobs.py
- models.py
- middleware.py
- navigation.py
Expand Down
2 changes: 2 additions & 0 deletions docs/plugins/development/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ For more information about database migrations, see the [Django documentation](h

::: netbox.models.features.ExportTemplatesMixin

::: netbox.models.features.JobsMixin

::: netbox.models.features.JournalingMixin

::: netbox.models.features.TagsMixin
Expand Down
8 changes: 7 additions & 1 deletion netbox/core/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from rest_framework.viewsets import ReadOnlyModelViewSet

from core import filtersets
from core.choices import DataSourceStatusChoices
from core.jobs import SyncDataSourceJob
from core.models import *
from netbox.api.metadata import ContentTypeMetadata
from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet
Expand Down Expand Up @@ -36,7 +38,11 @@ def sync(self, request, pk):
if not request.user.has_perm('core.sync_datasource', obj=datasource):
raise PermissionDenied(_("This user does not have permission to synchronize this data source."))

datasource.enqueue_sync_job(request)
# Enqueue the sync job & update the DataSource's status
SyncDataSourceJob.enqueue(instance=datasource, user=request.user)
datasource.status = DataSourceStatusChoices.QUEUED
DataSource.objects.filter(pk=datasource.pk).update(status=datasource.status)

serializer = serializers.DataSourceSerializer(datasource, context={'request': request})

return Response(serializer.data)
Expand Down
6 changes: 6 additions & 0 deletions netbox/core/choices.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ class JobStatusChoices(ChoiceSet):
(STATUS_FAILED, _('Failed'), 'red'),
)

ENQUEUED_STATE_CHOICES = (
STATUS_PENDING,
STATUS_SCHEDULED,
STATUS_RUNNING,
)

TERMINAL_STATE_CHOICES = (
STATUS_COMPLETED,
STATUS_ERRORED,
Expand Down
32 changes: 16 additions & 16 deletions netbox/core/jobs.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
import logging

from netbox.search.backends import search_backend
from .choices import *
from utilities.jobs import BackgroundJob
from .choices import DataSourceStatusChoices
from .exceptions import SyncError
from .models import DataSource
from rq.timeouts import JobTimeoutException

logger = logging.getLogger(__name__)


def sync_datasource(job, *args, **kwargs):
class SyncDataSourceJob(BackgroundJob):
"""
Call sync() on a DataSource.
"""
datasource = DataSource.objects.get(pk=job.object_id)

try:
job.start()
datasource.sync()
class Meta:
name = 'Synchronization'

# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())
def run(self, *args, **kwargs):
datasource = DataSource.objects.get(pk=self.job.object_id)

job.terminate()
try:
datasource.sync()

except Exception as e:
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) in (SyncError, JobTimeoutException):
logging.error(e)
else:
# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())

except Exception as e:
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) is SyncError:
logging.error(e)
raise e
24 changes: 24 additions & 0 deletions netbox/core/migrations/0012_job_object_type_optional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('contenttypes', '0002_remove_content_type_name'),
('core', '0011_move_objectchange'),
]

operations = [
migrations.AlterField(
model_name='job',
name='object_type',
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name='jobs',
to='contenttypes.contenttype'
),
),
]
19 changes: 1 addition & 18 deletions netbox/core/models/data.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import hashlib
import logging
import os
import yaml
from fnmatch import fnmatchcase
from urllib.parse import urlparse

import yaml
from django.conf import settings
from django.contrib.contenttypes.fields import GenericForeignKey
from django.core.exceptions import ValidationError
from django.core.validators import RegexValidator
from django.db import models
from django.urls import reverse
from django.utils import timezone
from django.utils.module_loading import import_string
from django.utils.translation import gettext as _

from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED
Expand All @@ -23,7 +22,6 @@
from ..choices import *
from ..exceptions import SyncError
from ..signals import post_sync, pre_sync
from .jobs import Job

__all__ = (
'AutoSyncRecord',
Expand Down Expand Up @@ -153,21 +151,6 @@ def to_objectchange(self, action):

return objectchange

def enqueue_sync_job(self, request):
"""
Enqueue a background job to synchronize the DataSource by calling sync().
"""
# Set the status to "syncing"
self.status = DataSourceStatusChoices.QUEUED
DataSource.objects.filter(pk=self.pk).update(status=self.status)

# Enqueue a sync job
return Job.enqueue(
import_string('core.jobs.sync_datasource'),
instance=self,
user=request.user
)

def get_backend(self):
backend_params = self.parameters or {}
return self.backend_class(self.source_url, **backend_params)
Expand Down
31 changes: 25 additions & 6 deletions netbox/core/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Job(models.Model):
to='contenttypes.ContentType',
related_name='jobs',
on_delete=models.CASCADE,
blank=True,
null=True
)
object_id = models.PositiveBigIntegerField(
blank=True,
Expand Down Expand Up @@ -197,25 +199,34 @@ def terminate(self, status=JobStatusChoices.STATUS_COMPLETED, error=None):
job_end.send(self)

@classmethod
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs):
def enqueue(cls, func, instance=None, name='', user=None, schedule_at=None, interval=None, immediate=False, **kwargs):
"""
Create a Job instance and enqueue a job using the given callable

Args:
func: The callable object to be enqueued for execution
instance: The NetBox object to which this job pertains
instance: The NetBox object to which this job pertains (optional)
name: Name for the job (optional)
user: The user responsible for running the job
schedule_at: Schedule the job to be executed at the passed date and time
interval: Recurrence interval (in minutes)
immediate: Run the job immediately without scheduling it in the background. Should be used for interactive
management commands only.
"""
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
rq_queue_name = get_queue_for_model(object_type.model)
if schedule_at and immediate:
raise ValueError("enqueue() cannot be called with values for both schedule_at and immediate.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be internationalized with _(


if instance:
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
object_id = instance.pk
else:
object_type = object_id = None
rq_queue_name = get_queue_for_model(object_type.model if object_type else None)
queue = django_rq.get_queue(rq_queue_name)
status = JobStatusChoices.STATUS_SCHEDULED if schedule_at else JobStatusChoices.STATUS_PENDING
job = Job.objects.create(
object_type=object_type,
object_id=instance.pk,
object_id=object_id,
name=name,
status=status,
scheduled=schedule_at,
Expand All @@ -224,8 +235,16 @@ def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=
job_id=uuid.uuid4()
)

if schedule_at:
# Run the job immediately, rather than enqueuing it as a background task. Note that this is a synchronous
# (blocking) operation, and execution will pause until the job completes.
if immediate:
func(job_id=str(job.job_id), job=job, **kwargs)

# Schedule the job to run at a specific date & time.
elif schedule_at:
queue.enqueue_at(schedule_at, func, job_id=str(job.job_id), job=job, **kwargs)

# Schedule the job to run asynchronously at this first available opportunity.
else:
queue.enqueue(func, job_id=str(job.job_id), job=job, **kwargs)

Expand Down
8 changes: 7 additions & 1 deletion netbox/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from utilities.query import count_related
from utilities.views import ContentTypePermissionRequiredMixin, GetRelatedModelsMixin, register_model_view
from . import filtersets, forms, tables
from .choices import DataSourceStatusChoices
from .jobs import SyncDataSourceJob
from .models import *


Expand Down Expand Up @@ -75,7 +77,11 @@ def get(self, request, pk):

def post(self, request, pk):
datasource = get_object_or_404(self.queryset, pk=pk)
job = datasource.enqueue_sync_job(request)

# Enqueue the sync job & update the DataSource's status
job = SyncDataSourceJob.enqueue(instance=datasource, user=request.user)
datasource.status = DataSourceStatusChoices.QUEUED
DataSource.objects.filter(pk=datasource.pk).update(status=datasource.status)

messages.success(request, f"Queued job #{job.pk} to sync {datasource}")
return redirect(datasource.get_absolute_url())
Expand Down
7 changes: 3 additions & 4 deletions netbox/extras/api/views.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from django.http import Http404
from django.shortcuts import get_object_or_404
from django.utils.module_loading import import_string
from django_rq.queues import get_connection
from rest_framework import status
from rest_framework.decorators import action
Expand All @@ -14,7 +15,6 @@
from core.models import Job, ObjectType
from extras import filtersets
from extras.models import *
from extras.scripts import run_script
from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
from netbox.api.features import SyncedDataMixin
from netbox.api.metadata import ContentTypeMetadata
Expand Down Expand Up @@ -273,10 +273,9 @@ def post(self, request, pk):
raise RQWorkerNotRunningException()

if input_serializer.is_valid():
Job.enqueue(
run_script,
ScriptJob = import_string("extras.jobs.ScriptJob")
ScriptJob.enqueue(
instance=script,
name=script.python_class.class_name,
user=request.user,
data=input_serializer.data['data'],
request=copy_safe_request(request),
Expand Down
Loading