Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #12068: Establish a direct relationship from jobs to objects #12075

Merged
merged 9 commits into from
Mar 28, 2023
4 changes: 2 additions & 2 deletions netbox/core/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ class JobSerializer(BaseModelSerializer):
class Meta:
model = Job
fields = [
'id', 'url', 'display', 'status', 'created', 'scheduled', 'interval', 'started', 'completed', 'name',
'object_type', 'user', 'data', 'job_id',
'id', 'url', 'display', 'object_type', 'object_id', 'name', 'status', 'created', 'scheduled', 'interval',
'started', 'completed', 'user', 'data', 'job_id',
]
2 changes: 1 addition & 1 deletion netbox/core/filtersets.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class JobFilterSet(BaseFilterSet):

class Meta:
model = Job
fields = ('id', 'interval', 'status', 'user', 'object_type', 'name')
fields = ('id', 'object_type', 'object_id', 'name', 'interval', 'status', 'user')

def search(self, queryset, name, value):
if not value.strip():
Expand Down
11 changes: 5 additions & 6 deletions netbox/core/jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging

from .choices import JobStatusChoices
from netbox.search.backends import search_backend
from .choices import *
from .exceptions import SyncError
Expand All @@ -9,22 +8,22 @@
logger = logging.getLogger(__name__)


def sync_datasource(job_result, *args, **kwargs):
def sync_datasource(job, *args, **kwargs):
"""
Call sync() on a DataSource.
"""
datasource = DataSource.objects.get(name=job_result.name)
datasource = DataSource.objects.get(pk=job.object_id)

try:
job_result.start()
job.start()
datasource.sync()

# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())

job_result.terminate()
job.terminate()

except SyncError as e:
job_result.terminate(status=JobStatusChoices.STATUS_ERRORED)
job.terminate(status=JobStatusChoices.STATUS_ERRORED)
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
logging.error(e)
14 changes: 6 additions & 8 deletions netbox/core/models/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from urllib.parse import urlparse

from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes.fields import GenericRelation
from django.core.exceptions import ValidationError
from django.core.validators import RegexValidator
from django.db import models
Expand All @@ -15,6 +15,7 @@
from django.utils.translation import gettext as _

from netbox.models import PrimaryModel
from netbox.models.features import JobsMixin
from netbox.registry import registry
from utilities.files import sha256_hash
from utilities.querysets import RestrictedQuerySet
Expand All @@ -31,7 +32,7 @@
logger = logging.getLogger('netbox.core.data')


class DataSource(PrimaryModel):
class DataSource(JobsMixin, PrimaryModel):
"""
A remote source, such as a git repository, from which DataFiles are synchronized.
"""
Expand Down Expand Up @@ -118,15 +119,12 @@ def enqueue_sync_job(self, request):
DataSource.objects.filter(pk=self.pk).update(status=self.status)

# Enqueue a sync job
job_result = Job.enqueue_job(
return Job.enqueue(
import_string('core.jobs.sync_datasource'),
name=self.name,
obj_type=ContentType.objects.get_for_model(DataSource),
user=request.user,
instance=self,
user=request.user
)

return job_result

def get_backend(self):
backend_cls = registry['data_backends'].get(self.type)
backend_params = self.parameters or {}
Expand Down
46 changes: 24 additions & 22 deletions netbox/core/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from django.core.validators import MinValueValidator
from django.db import models
from django.urls import reverse
from django.urls.exceptions import NoReverseMatch
from django.utils import timezone
from django.utils.translation import gettext as _

Expand Down Expand Up @@ -96,21 +95,12 @@ class Meta:
def __str__(self):
return str(self.job_id)

def delete(self, *args, **kwargs):
super().delete(*args, **kwargs)

rq_queue_name = get_config().QUEUE_MAPPINGS.get(self.object_type.model, RQ_QUEUE_DEFAULT)
queue = django_rq.get_queue(rq_queue_name)
job = queue.fetch_job(str(self.job_id))

if job:
job.cancel()

def get_absolute_url(self):
try:
return reverse(f'extras:{self.object_type.model}_result', args=[self.pk])
except NoReverseMatch:
return None
# TODO: Employ dynamic registration
if self.object_type.model == 'reportmodule':
return reverse(f'extras:report_result', kwargs={'job_pk': self.pk})
if self.object_type.model == 'scriptmodule':
return reverse(f'extras:script_result', kwargs={'job_pk': self.pk})

def get_status_color(self):
return JobStatusChoices.colors.get(self.status)
Expand All @@ -130,6 +120,16 @@ def duration(self):

return f"{int(minutes)} minutes, {seconds:.2f} seconds"

def delete(self, *args, **kwargs):
super().delete(*args, **kwargs)

rq_queue_name = get_config().QUEUE_MAPPINGS.get(self.object_type.model, RQ_QUEUE_DEFAULT)
queue = django_rq.get_queue(rq_queue_name)
job = queue.fetch_job(str(self.job_id))

if job:
job.cancel()

def start(self):
"""
Record the job's start time and update its status to "running."
Expand Down Expand Up @@ -162,35 +162,37 @@ def terminate(self, status=JobStatusChoices.STATUS_COMPLETED):
self.trigger_webhooks(event=EVENT_JOB_END)

@classmethod
def enqueue_job(cls, func, name, obj_type, user, schedule_at=None, interval=None, *args, **kwargs):
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs):
"""
Create a Job instance and enqueue a job using the given callable

Args:
func: The callable object to be enqueued for execution
instance: The NetBox object to which this job pertains
name: Name for the job (optional)
obj_type: ContentType to link to the Job instance object_type
user: User object to link to the Job instance
user: The user responsible for running the job
schedule_at: Schedule the job to be executed at the passed date and time
interval: Recurrence interval (in minutes)
"""
rq_queue_name = get_queue_for_model(obj_type.model)
object_type = ContentType.objects.get_for_model(instance, for_concrete_model=False)
rq_queue_name = get_queue_for_model(object_type.model)
queue = django_rq.get_queue(rq_queue_name)
status = JobStatusChoices.STATUS_SCHEDULED if schedule_at else JobStatusChoices.STATUS_PENDING
job = Job.objects.create(
object_type=object_type,
object_id=instance.pk,
name=name,
status=status,
object_type=obj_type,
scheduled=schedule_at,
interval=interval,
user=user,
job_id=uuid.uuid4()
)

if schedule_at:
queue.enqueue_at(schedule_at, func, job_id=str(job.job_id), job_result=job, **kwargs)
queue.enqueue_at(schedule_at, func, job_id=str(job.job_id), job=job, **kwargs)
else:
queue.enqueue(func, job_id=str(job.job_id), job_result=job, **kwargs)
queue.enqueue(func, job_id=str(job.job_id), job=job, **kwargs)

return job

Expand Down
13 changes: 9 additions & 4 deletions netbox/core/tables/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@


class JobTable(NetBoxTable):
id = tables.Column(
linkify=True
)
name = tables.Column(
linkify=True
)
object_type = columns.ContentTypeColumn(
verbose_name=_('Type')
)
object = tables.Column(
linkify=True
)
status = columns.ChoiceFieldColumn()
created = columns.DateTimeColumn()
scheduled = columns.DateTimeColumn()
Expand All @@ -25,10 +31,9 @@ class JobTable(NetBoxTable):
class Meta(NetBoxTable.Meta):
model = Job
fields = (
'pk', 'id', 'object_type', 'name', 'status', 'created', 'scheduled', 'interval', 'started', 'completed',
'user', 'job_id',
'pk', 'id', 'object_type', 'object', 'name', 'status', 'created', 'scheduled', 'interval', 'started',
'completed', 'user', 'job_id',
)
default_columns = (
'pk', 'id', 'object_type', 'name', 'status', 'created', 'scheduled', 'interval', 'started', 'completed',
'user',
'pk', 'id', 'object_type', 'object', 'name', 'status', 'created', 'started', 'completed', 'user',
)
4 changes: 2 additions & 2 deletions netbox/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ def get(self, request, pk):

def post(self, request, pk):
datasource = get_object_or_404(self.queryset, pk=pk)
job_result = datasource.enqueue_sync_job(request)
job = datasource.enqueue_sync_job(request)

messages.success(request, f"Queued job #{job_result.pk} to sync {datasource}")
messages.success(request, f"Queued job #{job.pk} to sync {datasource}")
return redirect(datasource.get_absolute_url())


Expand Down
62 changes: 33 additions & 29 deletions netbox/extras/api/views.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from django.contrib.contenttypes.models import ContentType
from django.http import Http404
from django.shortcuts import get_object_or_404
from django_rq.queues import get_connection
from rest_framework import status
from rest_framework.decorators import action
Expand All @@ -16,8 +17,8 @@
from core.models import Job
from extras import filtersets
from extras.models import *
from extras.reports import get_report, run_report
from extras.scripts import get_script, run_script
from extras.reports import get_module_and_report, run_report
from extras.scripts import get_module_and_script, run_script
from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
from netbox.api.features import SyncedDataMixin
from netbox.api.metadata import ContentTypeMetadata
Expand Down Expand Up @@ -170,19 +171,17 @@ class ReportViewSet(ViewSet):
exclude_from_schema = True
lookup_value_regex = '[^/]+' # Allow dots

def _retrieve_report(self, pk):

# Read the PK as "<module>.<report>"
if '.' not in pk:
def _get_report(self, pk):
try:
module_name, report_name = pk.split('.', maxsplit=1)
except ValueError:
raise Http404
module_name, report_name = pk.split('.', maxsplit=1)

# Raise a 404 on an invalid Report module/name
report = get_report(module_name, report_name)
module, report = get_module_and_report(module_name, report_name)
if report is None:
raise Http404

return report
return module, report

def list(self, request):
"""
Expand Down Expand Up @@ -215,13 +214,13 @@ def retrieve(self, request, pk):
"""
Retrieve a single Report identified as "<module>.<report>".
"""
module, report = self._get_report(pk)

# Retrieve the Report and Job, if any.
report = self._retrieve_report(pk)
report_content_type = ContentType.objects.get(app_label='extras', model='report')
object_type = ContentType.objects.get(app_label='extras', model='reportmodule')
report.result = Job.objects.filter(
object_type=report_content_type,
name=report.full_name,
object_type=object_type,
name=report.name,
status__in=JobStatusChoices.TERMINAL_STATE_CHOICES
).first()

Expand All @@ -245,14 +244,14 @@ def run(self, request, pk):
raise RQWorkerNotRunningException()

# Retrieve and run the Report. This will create a new Job.
report = self._retrieve_report(pk)
module, report = self._get_report(pk)
input_serializer = serializers.ReportInputSerializer(data=request.data)

if input_serializer.is_valid():
report.result = Job.enqueue_job(
report.result = Job.enqueue(
run_report,
name=report.full_name,
obj_type=ContentType.objects.get_for_model(Report),
instance=module,
name=report.class_name,
user=request.user,
job_timeout=report.job_timeout,
schedule_at=input_serializer.validated_data.get('schedule_at'),
Expand All @@ -275,11 +274,16 @@ class ScriptViewSet(ViewSet):
lookup_value_regex = '[^/]+' # Allow dots

def _get_script(self, pk):
module_name, script_name = pk.split('.', maxsplit=1)
script = get_script(module_name, script_name)
try:
module_name, script_name = pk.split('.', maxsplit=1)
except ValueError:
raise Http404

module, script = get_module_and_script(module_name, script_name)
if script is None:
raise Http404
return script

return module, script

def list(self, request):

Expand All @@ -305,11 +309,11 @@ def list(self, request):
return Response(serializer.data)

def retrieve(self, request, pk):
script = self._get_script(pk)
script_content_type = ContentType.objects.get(app_label='extras', model='script')
module, script = self._get_script(pk)
object_type = ContentType.objects.get(app_label='extras', model='scriptmodule')
script.result = Job.objects.filter(
object_type=script_content_type,
name=script.full_name,
object_type=object_type,
name=script.name,
status__in=JobStatusChoices.TERMINAL_STATE_CHOICES
).first()
serializer = serializers.ScriptDetailSerializer(script, context={'request': request})
Expand All @@ -324,18 +328,18 @@ def post(self, request, pk):
if not request.user.has_perm('extras.run_script'):
raise PermissionDenied("This user does not have permission to run scripts.")

script = self._get_script(pk)()
module, script = self._get_script(pk)
input_serializer = serializers.ScriptInputSerializer(data=request.data)

# Check that at least one RQ worker is running
if not Worker.count(get_connection('default')):
raise RQWorkerNotRunningException()

if input_serializer.is_valid():
script.result = Job.enqueue_job(
script.result = Job.enqueue(
run_script,
name=script.full_name,
obj_type=ContentType.objects.get_for_model(Script),
instance=module,
name=script.class_name,
user=request.user,
data=input_serializer.data['data'],
request=copy_safe_request(request),
Expand Down
Loading