From b65d9943976526af174308da6a8bd0b95b81d448 Mon Sep 17 00:00:00 2001 From: Jeremy Stretch Date: Wed, 20 Dec 2017 13:04:00 -0500 Subject: [PATCH] Fixes #1136: Enforce model validation during bulk update --- netbox/utilities/views.py | 123 ++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 73 deletions(-) diff --git a/netbox/utilities/views.py b/netbox/utilities/views.py index ad4966b13c2..e7ee4bdb72b 100644 --- a/netbox/utilities/views.py +++ b/netbox/utilities/views.py @@ -9,10 +9,10 @@ from django.core.exceptions import ValidationError from django.db import transaction, IntegrityError from django.db.models import ProtectedError -from django.forms import CharField, Form, ModelMultipleChoiceField, MultipleHiddenInput, Textarea, TypedChoiceField +from django.forms import CharField, Form, ModelMultipleChoiceField, MultipleHiddenInput, Textarea from django.http import HttpResponse from django.shortcuts import get_object_or_404, redirect, render -from django.template import TemplateSyntaxError +from django.template.exceptions import TemplateSyntaxError from django.urls import reverse from django.utils.html import escape from django.utils.http import is_safe_url @@ -514,31 +514,55 @@ def post(self, request, **kwargs): custom_fields = form.custom_fields if hasattr(form, 'custom_fields') else [] standard_fields = [field for field in form.fields if field not in custom_fields and field != 'pk'] - - # Update standard fields. If a field is listed in _nullify, delete its value. nullified_fields = request.POST.getlist('_nullify') - fields_to_update = {} - for field in standard_fields: - if field in form.nullable_fields and field in nullified_fields: - if isinstance(form.fields[field], CharField): - fields_to_update[field] = '' - else: - fields_to_update[field] = None - elif form.cleaned_data[field] not in (None, ''): - fields_to_update[field] = form.cleaned_data[field] - updated_count = self.cls.objects.filter(pk__in=pk_list).update(**fields_to_update) - - # Update custom fields for objects - if custom_fields: - objs_updated = self.update_custom_fields(pk_list, form, custom_fields, nullified_fields) - if objs_updated and not updated_count: - updated_count = objs_updated - - if updated_count: - msg = 'Updated {} {}'.format(updated_count, self.cls._meta.verbose_name_plural) - messages.success(self.request, msg) - UserAction.objects.log_bulk_edit(request.user, ContentType.objects.get_for_model(self.cls), msg) - return redirect(return_url) + + try: + + with transaction.atomic(): + + updated_count = 0 + for obj in self.cls.objects.filter(pk__in=pk_list): + + # Update standard fields. If a field is listed in _nullify, delete its value. + for name in standard_fields: + if name in form.nullable_fields and name in nullified_fields: + setattr(obj, name, '' if isinstance(form.fields[name], CharField) else None) + elif form.cleaned_data[name] not in (None, ''): + setattr(obj, name, form.cleaned_data[name]) + obj.full_clean() + obj.save() + + # Update custom fields + obj_type = ContentType.objects.get_for_model(self.cls) + for name in custom_fields: + field = form.fields[name].model + if name in form.nullable_fields and name in nullified_fields: + CustomFieldValue.objects.filter( + field=field, obj_type=obj_type, obj_id=obj.pk + ).delete() + elif form.cleaned_data[name] not in [None, '']: + try: + cfv = CustomFieldValue.objects.get( + field=field, obj_type=obj_type, obj_id=obj.pk + ) + except CustomFieldValue.DoesNotExist: + cfv = CustomFieldValue( + field=field, obj_type=obj_type, obj_id=obj.pk + ) + cfv.value = form.cleaned_data[name] + cfv.save() + + updated_count += 1 + + if updated_count: + msg = 'Updated {} {}'.format(updated_count, self.cls._meta.verbose_name_plural) + messages.success(self.request, msg) + UserAction.objects.log_bulk_edit(request.user, ContentType.objects.get_for_model(self.cls), msg) + + return redirect(return_url) + + except ValidationError as e: + messages.error(self.request, "{} failed validation: {}".format(obj, e)) else: initial_data = request.POST.copy() @@ -559,53 +583,6 @@ def post(self, request, **kwargs): 'return_url': return_url, }) - def update_custom_fields(self, pk_list, form, fields, nullified_fields): - obj_type = ContentType.objects.get_for_model(self.cls) - objs_updated = False - - for name in fields: - - field = form.fields[name].model - - # Setting the field to null - if name in form.nullable_fields and name in nullified_fields: - - # Delete all CustomFieldValues for instances of this field belonging to the selected objects. - CustomFieldValue.objects.filter(field=field, obj_type=obj_type, obj_id__in=pk_list).delete() - objs_updated = True - - # Updating the value of the field - elif form.cleaned_data[name] not in [None, '']: - - # Check for zero value (bulk editing) - if isinstance(form.fields[name], TypedChoiceField) and form.cleaned_data[name] == 0: - serialized_value = field.serialize_value(None) - else: - serialized_value = field.serialize_value(form.cleaned_data[name]) - - # Gather any pre-existing CustomFieldValues for the objects being edited. - existing_cfvs = CustomFieldValue.objects.filter(field=field, obj_type=obj_type, obj_id__in=pk_list) - - # Determine which objects have an existing CFV to update and which need a new CFV created. - update_list = [cfv['obj_id'] for cfv in existing_cfvs.values()] - create_list = list(set(pk_list) - set(update_list)) - - # Creating/updating CFVs - if serialized_value: - existing_cfvs.update(serialized_value=serialized_value) - CustomFieldValue.objects.bulk_create([ - CustomFieldValue(field=field, obj_type=obj_type, obj_id=pk, serialized_value=serialized_value) - for pk in create_list - ]) - - # Deleting CFVs - else: - existing_cfvs.delete() - - objs_updated = True - - return len(pk_list) if objs_updated else 0 - class BulkDeleteView(View): """