From 116967f8919b2776cb720b21f18e64f78e7df6ad Mon Sep 17 00:00:00 2001 From: Peter Eckel Date: Tue, 17 Oct 2023 21:34:10 +0200 Subject: [PATCH] Fixed DNS exception handling for IP addresses with coupled DNS records --- netbox_dns/middleware.py | 289 ++++++++++-------- netbox_dns/models.py | 2 +- .../tests/coupling/test_ip_dns_coupling.py | 56 ++-- 3 files changed, 182 insertions(+), 165 deletions(-) diff --git a/netbox_dns/middleware.py b/netbox_dns/middleware.py index 032e0d15..b386e1f1 100644 --- a/netbox_dns/middleware.py +++ b/netbox_dns/middleware.py @@ -1,117 +1,181 @@ from django.db import transaction from django.db.models import signals -from django.core.exceptions import MiddlewareNotUsed, PermissionDenied +from django.core.exceptions import MiddlewareNotUsed, PermissionDenied, ValidationError from ipam.models import IPAddress +from ipam.choices import IPAddressStatusChoices from extras.plugins import get_plugin_config -from netbox_dns.models import Zone, Record, RecordTypeChoices -from utilities.exceptions import PermissionsViolation, AbortRequest +from netbox.signals import post_clean from utilities.permissions import resolve_permission +from netbox_dns.models import Zone, Record, RecordTypeChoices, RecordStatusChoices + + +def address_record_cf_data(ip_address): + name = ip_address.custom_field_data.get("ipaddress_dns_record_name") + zone_id = ip_address.custom_field_data.get("ipaddress_dns_zone_id") + + if name is None or zone_id is None: + return None, None + + return name, zone_id + + +def address_record_type(ip_address): + return RecordTypeChoices.AAAA if ip_address.family == 6 else RecordTypeChoices.A + + +def address_record_status(ip_address): + return ( + RecordStatusChoices.STATUS_ACTIVE + if ip_address.status == IPAddressStatusChoices.STATUS_ACTIVE + else RecordStatusChoices.STATUS_INACTIVE + ) + + +def new_address_record(ip_address): + name, zone_id = address_record_cf_data(ip_address) + + if zone_id is None: + return None + + return Record( + name=name, + zone_id=zone_id, + status=address_record_status(ip_address), + type=address_record_type(ip_address), + value=str(ip_address.address.ip), + ipam_ip_address_id=ip_address.id, + managed=True, + ) + + +def get_address_record(ip_address): + return ip_address.netbox_dns_records.first() + + +def update_address_record(record, ip_address): + name, zone_id = address_record_cf_data(ip_address) + + record.name = name + record.zone_id = zone_id + record.status = address_record_status(ip_address) + record.value = str(ip_address.address.ip) + + +def check_address_record(user, record, permission): + action = resolve_permission(permission)[1] + + if not user.has_perm(permission): + raise PermissionDenied() + + with transaction.atomic(): + if action in {"add", "change"}: + record.save() + + queryset = Record.objects.restrict(user=user, action=action) + if not queryset.filter(pk=record.pk).exists(): + raise PermissionDenied() + + transaction.set_rollback(True) + class Action: def __init__(self, request): self.request = request + def post_clean(self, sender, **kwargs): + ip_address = kwargs.get("instance") + user = self.request.user + + try: + if ip_address.id is None: + # + # Handle new IP addresses + # + record = new_address_record(ip_address) + if record is not None: + check_address_record(user, record, "netbox_dns.add_record") + else: + # + # Handle updates to existing IP addresses + # + record = get_address_record(ip_address) + if record is not None: + # + # Update or delete the existing address record + # + name, zone_id = address_record_cf_data(ip_address) + if zone_id is not None: + # + # Update the address record + # + update_address_record(record, ip_address) + check_address_record(user, record, "netbox_dns.change_record") + else: + # + # Delete the address record + # + check_address_record(user, record, "netbox_dns.delete_record") + else: + # + # Create a new address record + # + record = new_address_record(ip_address) + if record is not None: + check_address_record(user, record, "netbox_dns.add_record") + + except ValidationError as exc: + value = exc.error_dict.pop("name", None) + if value is not None: + exc.error_dict["cf_ipaddress_dns_record_name"] = value + + value = exc.error_dict.pop("value", None) + if value is not None: + exc.error_dict["cf_ipaddress_dns_record_name"] = value + + raise ValidationError(exc) + # - # Check permission to create DNS record before IP address creation - # NB: If IP address is created *before* DNS record is allowed it's too late - # → permission check must be done at pre-save, and an exception - # must be raised to prevent IP creation. + # Update DNS related fields according to the contents of the IPAM-DNS + # coupling custom fields. # def pre_save(self, sender, **kwargs): - if kwargs.get("update_fields"): - return - ip_address = kwargs.get("instance") - name = ip_address.custom_field_data.get("ipaddress_dns_record_name") - zone_id = ip_address.custom_field_data.get("ipaddress_dns_zone_id") - - # Handle new IPAddress objects only; name and zone must both be defined - if ip_address.id is None and name is not None and zone_id is not None: - zone = Zone.objects.get(id=zone_id) - type = ( - RecordTypeChoices.AAAA - if ip_address.family == 6 - else RecordTypeChoices.A - ) - value = str(ip_address.address.ip) - - # Create a DNS record *without saving* in order to check permissions - record = Record(name=name, zone=zone, type=type, value=value) - user = self.request.user - check_record_permission(user, record, "netbox_dns.add_record") + name, zone_id = address_record_cf_data(ip_address) + + if zone_id is not None: + ip_address.dns_name = f"{name}.{Zone.objects.get(pk=zone_id).name}" + else: + ip_address.dns_name = "" + ip_address.custom_field_data["ipaddress_dns_record_name"] = None + ip_address.custom_field_data["ipaddress_dns_zone_id"] = None # # Handle DNS record operation after IPAddress has been created or modified # def post_save(self, sender, **kwargs): - # Do not process specific field update (eg. dns_hostname modify) - if kwargs.get("update_fields"): - return - ip_address = kwargs.get("instance") - user = self.request.user - name = ip_address.custom_field_data.get("ipaddress_dns_record_name") - zone_id = ip_address.custom_field_data.get("ipaddress_dns_zone_id") - zone = Zone.objects.get(id=zone_id) if zone_id is not None else None - - # Clear the other field if one is empty, which is inconsistent - if name is None: - zone = None - elif zone is None: - name = None - - # Delete the DNS record because name and zone have been removed - if zone is None: - # Find the record pointing to this IP Address - for record in ip_address.netbox_dns_records.all(): - # If permission ok, clear all fields related to DNS - check_record_permission(user, record, "netbox_dns.delete_record") - - ip_address.dns_name = "" - ip_address.custom_field_data["ipaddress_dns_record_name"] = "" - ip_address.save(update_fields=["custom_field_data", "dns_name"]) - - record.delete() - - # Modify or add the DNS record - else: - type = ( - RecordTypeChoices.AAAA - if ip_address.family == 6 - else RecordTypeChoices.A - ) - - # If DNS record already point to this IP, modify it - record = ip_address.netbox_dns_records.first() - if record is not None: - record.name = name - record.zone = zone - record.value = str(ip_address.address.ip) - record.type = type - - check_record_permission(user, record, "netbox_dns.change_record") - record.save() + name, zone_id = address_record_cf_data(ip_address) + + if zone_id is None: + # + # Name/Zone CF data has been removed: Remove the DNS address record + # + ip_address.netbox_dns_records.all().delete() + else: + # + # Name/Zone CF data is present: Check for a DNS address record + # and add or modify it as necessary + # + record = get_address_record(ip_address) + if record is None: + record = new_address_record(ip_address) else: - # Create a new record - record = Record( - name=name, - zone=zone, - type=type, - value=str(ip_address.address.ip), - ipam_ip_address=ip_address, - managed=True, - ) - - check_record_permission( - user, record, "netbox_dns.add_record", commit=True - ) - - # Update the dns_name field with FQDN - ip_address.dns_name = record.fqdn.rstrip(".") - ip_address.save(update_fields=["dns_name"]) + update_address_record(record, ip_address) + + record.save() # # Delete DNS record before deleting IP address @@ -121,49 +185,11 @@ def pre_delete(self, sender, **kwargs): for record in ip_address.netbox_dns_records.all(): user = self.request.user - check_record_permission(user, record, "netbox_dns.delete_record") + check_address_record(user, record, "netbox_dns.delete_record") record.delete() -# -# Filter through permissions. Simulate adding the record in the "add" case. -# NB: Side-effect if "commit" is set to True → the DNS record is created. -# This is necessary to avoid the cascading effects of PTR creation. -# -def check_record_permission(user, record, perm, commit=False): - # Check that the user has been granted the required permission(s). - action = resolve_permission(perm)[1] - - if not user.has_perm(perm): - raise PermissionDenied() - - try: - with transaction.atomic(): - # Save record when adding - # Rollback is done at the end of the transaction, unless committed - - if action == "add": - record.save() - - # Update the view's QuerySet to filter only the permitted objects - queryset = Record.objects.restrict(user, action) - # Check that record conforms to permissions - # → must be included in the restricted queryset - if not queryset.filter(pk=record.pk).exists(): - raise PermissionDenied() - - if not commit: - raise AbortRequest("Normal Exit") - - # Catch "Normal Exit" without modification, rollback transaction - except AbortRequest as exc: - pass - - except Exception as exc: - raise exc - - class IpamCouplingMiddleware: def __init__(self, get_response): if not get_plugin_config("netbox_dns", "feature_ipam_coupling"): @@ -172,9 +198,12 @@ def __init__(self, get_response): self.get_response = get_response def __call__(self, request): - # connect signals to actions + # + # Connect signals to actions + # action = Action(request) connections = [ + (post_clean, action.post_clean), (signals.pre_save, action.pre_save), (signals.post_save, action.post_save), (signals.pre_delete, action.pre_delete), diff --git a/netbox_dns/models.py b/netbox_dns/models.py index ccb05f59..da92d7ec 100644 --- a/netbox_dns/models.py +++ b/netbox_dns/models.py @@ -587,7 +587,7 @@ def delete(self, *args, **kwargs): custom_field_data__ipaddress_dns_zone_id=self.pk ): ip.dns_name = "" - ip.custom_field_data["ipaddress_dns_record_name"] = "" + ip.custom_field_data["ipaddress_dns_record_name"] = None ip.custom_field_data["ipaddress_dns_zone_id"] = None ip.save(update_fields=["dns_name", "custom_field_data"]) diff --git a/netbox_dns/tests/coupling/test_ip_dns_coupling.py b/netbox_dns/tests/coupling/test_ip_dns_coupling.py index c0fc40da..95816c9b 100644 --- a/netbox_dns/tests/coupling/test_ip_dns_coupling.py +++ b/netbox_dns/tests/coupling/test_ip_dns_coupling.py @@ -1,14 +1,11 @@ from django.urls import reverse from django.test import override_settings -from django.contrib.contenttypes.models import ContentType from django.core import management from rest_framework import status from utilities.testing import APITestCase -from extras.models import CustomField -from extras.choices import CustomFieldTypeChoices -from ipam.models import IPAddress, Prefix +from ipam.models import IPAddress from netaddr import IPNetwork from netbox_dns.models import Record, Zone, NameServer, RecordTypeChoices @@ -37,7 +34,6 @@ def setUpTestData(cls): cls.zone2 = Zone.objects.create( name="zone2.example.com", **cls.zone_data, soa_mname=cls.nameserver ) - cls.prefix = Prefix.objects.create(prefix=cls.network) # # Add the required custom fields @@ -152,8 +148,6 @@ def test_delete_ip(self): self.add_permissions("netbox_dns.delete_record") # Create DNS record and IP Address - A = RecordTypeChoices.A - s_addr = str(addr.ip) ip_address = IPAddress.objects.create( address=addr, dns_name=f"{name}.{zone.name}", @@ -165,8 +159,8 @@ def test_delete_ip(self): record = Record.objects.create( name=name, zone=zone, - type=A, - value=s_addr, + type=RecordTypeChoices.A, + value=str(addr.ip), ipam_ip_address=ip_address, ) @@ -202,13 +196,11 @@ def test_modify_name_existing_ip(self): }, ) # Create DNS record - A = RecordTypeChoices.A - s_addr = str(addr.ip) - record = Record.objects.create( + Record.objects.create( name=name, zone=zone, - type=A, - value=s_addr, + type=RecordTypeChoices.A, + value=str(addr.ip), ipam_ip_address=ip_address, ) @@ -256,13 +248,11 @@ def test_modify_name_existing_ip_missing_dns_permission(self): }, ) # Create DNS record - A = RecordTypeChoices.A - s_addr = str(addr.ip) - record = Record.objects.create( + Record.objects.create( name=name, zone=zone, - type=A, - value=s_addr, + type=RecordTypeChoices.A, + value=str(addr.ip), ipam_ip_address=ip_address, ) @@ -308,13 +298,11 @@ def test_clear_name_existing_ip(self): }, ) # Create DNS record - A = RecordTypeChoices.A - s_addr = str(addr.ip) - record = Record.objects.create( + Record.objects.create( name=name, zone=zone, - type=A, - value=s_addr, + type=RecordTypeChoices.A, + value=str(addr.ip), ipam_ip_address=ip_address, ) @@ -331,7 +319,7 @@ def test_clear_name_existing_ip(self): # Check if dns_name is empty self.assertEqual(ip_address.dns_name, "") cf_name = ip_address.custom_field_data.get("ipaddress_dns_record_name") - self.assertEqual(cf_name, "") + self.assertIsNone(cf_name) @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) def test_rename_zone_existing_ip(self): @@ -354,13 +342,11 @@ def test_rename_zone_existing_ip(self): }, ) # Create DNS record - A = RecordTypeChoices.A - s_addr = str(addr.ip) - record = Record.objects.create( + Record.objects.create( name=name, zone=zone, - type=A, - value=s_addr, + type=RecordTypeChoices.A, + value=str(addr.ip), ipam_ip_address=ip_address, ) @@ -395,10 +381,12 @@ def test_delete_zone_existing_ip(self): }, ) # Create DNS record - A = RecordTypeChoices.A - s_addr = str(addr.ip) record = Record.objects.create( - name=name, zone=zone, type=A, value=s_addr, ipam_ip_address=ip_address + name=name, + zone=zone, + type=RecordTypeChoices.A, + value=str(addr.ip), + ipam_ip_address=ip_address, ) # Grant permissions to user @@ -419,4 +407,4 @@ def test_delete_zone_existing_ip(self): self.assertEqual(ip_address.dns_name, "") # Check if custom field "ipaddress_dns_record_name" is empty cf_name = ip_address.custom_field_data.get("ipaddress_dns_record_name") - self.assertEqual(cf_name, "") + self.assertIsNone(cf_name)