From c6c8859a11fdb93d45ef1b9dce9050c209dc6016 Mon Sep 17 00:00:00 2001 From: Peter Eckel Date: Sat, 18 Nov 2023 13:10:31 +0100 Subject: [PATCH] Improved DNS record creation for IPAM coupling * Removed the middleware code as it turned out to be too limited for the purpose. More specifically, it is not used for model-level changes to objects, so it doesn't work with Custom Scripts etc. (see issue #105) * All checks and operations are now done in signal handlers for ipam.IPAddress that are part of NetBox DNS. Future releases (dropping support for NetBox < 3.7) could be using CUSTOM_VALIDATORS and PROTECTION_RULES instead. * Testing was extended to cover model-level actions and API level actions to IPAddress objects. * Improved DNS record permission checks. Open issue: * Although actions with missing permissions to delete DNS records work properly in the API and the GUI, the respective tests have to be skipped as they fail in the context of unit testing. Potential reasons include a bug in the test framework. This does not have any functional implications and just affects tests for the deletion of IPAddress objects with missing NetBox DNS (object) permissions. --- netbox_dns/__init__.py | 1 - netbox_dns/middleware.py | 226 ----- netbox_dns/models/__init__.py | 2 + .../{tests/coupling => signals}/__init__.py | 0 netbox_dns/signals/ipam_coupling.py | 149 +++ netbox_dns/tables/zone.py | 1 - .../tests/coupling/test_ip_dns_coupling.py | 415 -------- netbox_dns/tests/ipam_coupling/__init__.py | 0 netbox_dns/tests/ipam_coupling/test_api.py | 957 ++++++++++++++++++ .../tests/ipam_coupling/test_records.py | 301 ++++++ .../{utilities.py => utilities/__init__.py} | 0 netbox_dns/utilities/ipam_coupling.py | 87 ++ 12 files changed, 1496 insertions(+), 643 deletions(-) delete mode 100644 netbox_dns/middleware.py rename netbox_dns/{tests/coupling => signals}/__init__.py (100%) create mode 100644 netbox_dns/signals/ipam_coupling.py delete mode 100644 netbox_dns/tests/coupling/test_ip_dns_coupling.py create mode 100644 netbox_dns/tests/ipam_coupling/__init__.py create mode 100644 netbox_dns/tests/ipam_coupling/test_api.py create mode 100644 netbox_dns/tests/ipam_coupling/test_records.py rename netbox_dns/{utilities.py => utilities/__init__.py} (100%) create mode 100644 netbox_dns/utilities/ipam_coupling.py diff --git a/netbox_dns/__init__.py b/netbox_dns/__init__.py index 20457bdf..6f081608 100644 --- a/netbox_dns/__init__.py +++ b/netbox_dns/__init__.py @@ -20,7 +20,6 @@ class DNSConfig(PluginConfig): version = __version__ author = "Peter Eckel" author_email = "pe-netbox-plugin-dns@hindenburgring.com" - middleware = ["netbox_dns.middleware.IpamCouplingMiddleware"] required_settings = [] default_settings = { "zone_default_ttl": 86400, diff --git a/netbox_dns/middleware.py b/netbox_dns/middleware.py deleted file mode 100644 index d81b9472..00000000 --- a/netbox_dns/middleware.py +++ /dev/null @@ -1,226 +0,0 @@ -from django.db import transaction -from django.db.models import signals -from django.core.exceptions import MiddlewareNotUsed, PermissionDenied, ValidationError - -from ipam.models import IPAddress -from ipam.choices import IPAddressStatusChoices - -try: - # NetBox 3.5.0 - 3.5.7, 3.5.9+ - from extras.plugins import get_plugin_config -except ImportError: - # NetBox 3.5.8 - from extras.plugins.utils import get_plugin_config -from netbox.signals import post_clean -from utilities.permissions import resolve_permission - -from netbox_dns.models import Zone, Record, RecordTypeChoices, RecordStatusChoices - - -def address_record_cf_data(ip_address): - name = ip_address.custom_field_data.get("ipaddress_dns_record_name") - zone_id = ip_address.custom_field_data.get("ipaddress_dns_zone_id") - - if name is None or zone_id is None: - return None, None - - return name, zone_id - - -def address_record_type(ip_address): - return RecordTypeChoices.AAAA if ip_address.family == 6 else RecordTypeChoices.A - - -def address_record_status(ip_address): - return ( - RecordStatusChoices.STATUS_ACTIVE - if ip_address.status == IPAddressStatusChoices.STATUS_ACTIVE - else RecordStatusChoices.STATUS_INACTIVE - ) - - -def new_address_record(ip_address): - name, zone_id = address_record_cf_data(ip_address) - - if zone_id is None: - return None - - return Record( - name=name, - zone_id=zone_id, - status=address_record_status(ip_address), - type=address_record_type(ip_address), - value=str(ip_address.address.ip), - ipam_ip_address_id=ip_address.id, - managed=True, - ) - - -def get_address_record(ip_address): - return ip_address.netbox_dns_records.first() - - -def update_address_record(record, ip_address): - name, zone_id = address_record_cf_data(ip_address) - - record.name = name - record.zone_id = zone_id - record.status = address_record_status(ip_address) - record.value = str(ip_address.address.ip) - - -def check_address_record(user, record, permission): - action = resolve_permission(permission)[1] - - if not user.has_perm(permission): - raise PermissionDenied() - - with transaction.atomic(): - if action in {"add", "change"}: - record.save() - - queryset = Record.objects.restrict(user=user, action=action) - if not queryset.filter(pk=record.pk).exists(): - raise PermissionDenied() - - transaction.set_rollback(True) - - -class Action: - def __init__(self, request): - self.request = request - - def post_clean(self, sender, **kwargs): - ip_address = kwargs.get("instance") - user = self.request.user - - try: - if ip_address.id is None: - # - # Handle new IP addresses - # - record = new_address_record(ip_address) - if record is not None: - check_address_record(user, record, "netbox_dns.add_record") - else: - # - # Handle updates to existing IP addresses - # - record = get_address_record(ip_address) - if record is not None: - # - # Update or delete the existing address record - # - name, zone_id = address_record_cf_data(ip_address) - if zone_id is not None: - # - # Update the address record - # - update_address_record(record, ip_address) - check_address_record(user, record, "netbox_dns.change_record") - else: - # - # Delete the address record - # - check_address_record(user, record, "netbox_dns.delete_record") - else: - # - # Create a new address record - # - record = new_address_record(ip_address) - if record is not None: - check_address_record(user, record, "netbox_dns.add_record") - - except ValidationError as exc: - value = exc.error_dict.pop("name", None) - if value is not None: - exc.error_dict["cf_ipaddress_dns_record_name"] = value - - value = exc.error_dict.pop("value", None) - if value is not None: - exc.error_dict["cf_ipaddress_dns_record_name"] = value - - raise ValidationError(exc) - - # - # Update DNS related fields according to the contents of the IPAM-DNS - # coupling custom fields. - # - def pre_save(self, sender, **kwargs): - ip_address = kwargs.get("instance") - name, zone_id = address_record_cf_data(ip_address) - - if zone_id is not None: - ip_address.dns_name = f"{name}.{Zone.objects.get(pk=zone_id).name}" - else: - ip_address.dns_name = "" - ip_address.custom_field_data["ipaddress_dns_record_name"] = None - ip_address.custom_field_data["ipaddress_dns_zone_id"] = None - - # - # Handle DNS record operation after IPAddress has been created or modified - # - def post_save(self, sender, **kwargs): - ip_address = kwargs.get("instance") - name, zone_id = address_record_cf_data(ip_address) - - if zone_id is None: - # - # Name/Zone CF data has been removed: Remove the DNS address record - # - for record in ip_address.netbox_dns_records.all(): - record.delete() - - else: - # - # Name/Zone CF data is present: Check for a DNS address record - # and add or modify it as necessary - # - record = get_address_record(ip_address) - if record is None: - record = new_address_record(ip_address) - else: - update_address_record(record, ip_address) - - record.save() - - # - # Delete DNS record before deleting IP address - # - def pre_delete(self, sender, **kwargs): - ip_address = kwargs.get("instance") - - for record in ip_address.netbox_dns_records.all(): - user = self.request.user - check_address_record(user, record, "netbox_dns.delete_record") - - record.delete() - - -class IpamCouplingMiddleware: - def __init__(self, get_response): - if not get_plugin_config("netbox_dns", "feature_ipam_coupling"): - raise MiddlewareNotUsed - - self.get_response = get_response - - def __call__(self, request): - # - # Connect signals to actions - # - action = Action(request) - connections = [ - (post_clean, action.post_clean), - (signals.pre_save, action.pre_save), - (signals.post_save, action.post_save), - (signals.pre_delete, action.pre_delete), - ] - for signal, receiver in connections: - signal.connect(receiver, sender=IPAddress) - - response = self.get_response(request) - - for signal, receiver in connections: - signal.disconnect(receiver) - - return response diff --git a/netbox_dns/models/__init__.py b/netbox_dns/models/__init__.py index c7e22741..e9831d86 100644 --- a/netbox_dns/models/__init__.py +++ b/netbox_dns/models/__init__.py @@ -4,3 +4,5 @@ from .view import * from .contact import * from .registrar import * + +from netbox_dns.signals import ipam_coupling diff --git a/netbox_dns/tests/coupling/__init__.py b/netbox_dns/signals/__init__.py similarity index 100% rename from netbox_dns/tests/coupling/__init__.py rename to netbox_dns/signals/__init__.py diff --git a/netbox_dns/signals/ipam_coupling.py b/netbox_dns/signals/ipam_coupling.py new file mode 100644 index 00000000..90dfb683 --- /dev/null +++ b/netbox_dns/signals/ipam_coupling.py @@ -0,0 +1,149 @@ +from django.dispatch import receiver +from django.db.models.signals import pre_save, post_save, pre_delete +from django.core.exceptions import ValidationError, PermissionDenied +from rest_framework.exceptions import PermissionDenied as APIPermissionDenied + +from netbox.signals import post_clean +from netbox.context import current_request +from ipam.models import IPAddress + +from netbox_dns.models import Zone +from netbox_dns.utilities.ipam_coupling import ( + ipaddress_cf_data, + get_address_record, + new_address_record, + update_address_record, + check_permission, + dns_changed, + DNSPermissionDenied, +) + +try: + # NetBox 3.5.0 - 3.5.7, 3.5.9+ + from extras.plugins import get_plugin_config +except ImportError: + # NetBox 3.5.8 + from extras.plugins.utils import get_plugin_config + + +@receiver(post_clean, sender=IPAddress) +def ip_address_check_permissions_save(instance, **kwargs): + if not get_plugin_config("netbox_dns", "feature_ipam_coupling"): + return + + request = current_request.get() + if request is None: + return + + try: + if instance.id is None: + record = new_address_record(instance) + if record is not None: + record.full_clean() + check_permission(request, "netbox_dns.add_record", record) + + else: + if not dns_changed(IPAddress.objects.get(pk=instance.id), instance): + return + + record = get_address_record(instance) + if record is not None: + name, zone_id = ipaddress_cf_data(instance) + if zone_id is not None: + update_address_record(record, instance) + record.full_clean() + check_permission(request, "netbox_dns.change_record", record) + else: + check_permission(request, "netbox_dns.delete_record", record) + + else: + record = new_address_record(instance) + if record is not None: + record.full_clean() + check_permission(request, "netbox_dns.add_record", record) + + except ValidationError as exc: + if hasattr(exc, "error_dict"): + value = exc.error_dict.pop("name", None) + if value is not None: + exc.error_dict["cf_ipaddress_dns_record_name"] = value + + value = exc.error_dict.pop("value", None) + if value is not None: + exc.error_dict["cf_ipaddress_dns_record_name"] = value + + raise ValidationError(exc) + + except DNSPermissionDenied as exc: + raise ValidationError(exc) + + +@receiver(pre_delete, sender=IPAddress) +def ip_address_delete_address_record(instance, **kwargs): + if not get_plugin_config("netbox_dns", "feature_ipam_coupling"): + return + + request = current_request.get() + if request is not None: + try: + for record in instance.netbox_dns_records.all(): + check_permission(request, "netbox_dns.delete_record", record) + + except DNSPermissionDenied as exc: + if request.path_info.startswith("/api/"): + raise APIPermissionDenied(exc) from None + + raise PermissionDenied(exc) from None + + for record in instance.netbox_dns_records.all(): + record.delete() + + +# +# Update DNS related fields according to the contents of the IPAM-DNS +# coupling custom fields. +# +@receiver(pre_save, sender=IPAddress) +def ip_address_update_dns_information(instance, **kwargs): + if not get_plugin_config("netbox_dns", "feature_ipam_coupling"): + return + + name, zone_id = ipaddress_cf_data(instance) + + if zone_id is not None: + instance.dns_name = f"{name}.{Zone.objects.get(pk=zone_id).name}" + else: + instance.dns_name = "" + instance.custom_field_data["ipaddress_dns_record_name"] = None + instance.custom_field_data["ipaddress_dns_zone_id"] = None + + +# +# Handle DNS record operation after IPAddress has been created or modified +# +@receiver(post_save, sender=IPAddress) +def ip_address_update_address_record(instance, **kwargs): + if not get_plugin_config("netbox_dns", "feature_ipam_coupling"): + return + + name, zone_id = ipaddress_cf_data(instance) + + if zone_id is None: + # + # Name/Zone CF data has been removed: Remove the DNS address record + # + for record in instance.netbox_dns_records.all(): + record.delete() + + else: + # + # Name/Zone CF data is present: Check for a DNS address record and add + # or modify it as necessary + # + record = get_address_record(instance) + if record is None: + record = new_address_record(instance) + else: + update_address_record(record, instance) + + record.save() diff --git a/netbox_dns/tables/zone.py b/netbox_dns/tables/zone.py index 5e7fb0c9..08e336ac 100644 --- a/netbox_dns/tables/zone.py +++ b/netbox_dns/tables/zone.py @@ -4,7 +4,6 @@ ChoiceFieldColumn, NetBoxTable, TagColumn, - ActionsColumn, ) from tenancy.tables import TenancyColumnsMixin diff --git a/netbox_dns/tests/coupling/test_ip_dns_coupling.py b/netbox_dns/tests/coupling/test_ip_dns_coupling.py deleted file mode 100644 index c371f7b3..00000000 --- a/netbox_dns/tests/coupling/test_ip_dns_coupling.py +++ /dev/null @@ -1,415 +0,0 @@ -from django.urls import reverse -from django.test import override_settings -from django.core import management - -from rest_framework import status -from utilities.testing import APITestCase - -from ipam.models import IPAddress -from netaddr import IPNetwork -from netbox_dns.models import Record, Zone, NameServer, RecordTypeChoices - - -class IPAddressDNSRecordCouplingTest(APITestCase): - network = "10.0.0.0/24" - ns = "ns1.example.com" - zone_data = { - "default_ttl": 86400, - "soa_rname": "hostmaster.example.com", - "soa_refresh": 172800, - "soa_retry": 7200, - "soa_expire": 2592000, - "soa_ttl": 86400, - "soa_minimum": 3600, - "soa_serial": 1, - } - - @classmethod - def setUpTestData(cls): - # Test data - cls.nameserver = NameServer.objects.create(name=cls.ns) - cls.zones = ( - Zone(name="zone1.example.com", **cls.zone_data, soa_mname=cls.nameserver), - Zone(name="zone2.example.com", **cls.zone_data, soa_mname=cls.nameserver), - Zone(name="0.0.10.in-addr.arpa", **cls.zone_data, soa_mname=cls.nameserver), - ) - for zone in cls.zones: - zone.save() - - # - # Add the required custom fields - # - management.call_command("setup_coupling") - - @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) - def test_create_ip(self): - zone = self.zones[0] - name = "test-create" - addr = "10.0.0.25/24" - - # Grant permissions to user - self.add_permissions("ipam.add_ipaddress") - self.add_permissions("netbox_dns.add_record") - - url = reverse("ipam-api:ipaddress-list") - data = { - "address": addr, - "custom_fields": { - "ipaddress_dns_zone_id": zone.id, - "ipaddress_dns_record_name": name, - }, - } - response = self.client.post(url, data, format="json", **self.header) - - self.assertTrue(status.is_success(response.status_code)) - - # Check if "record" has been created, is managed and has correct name and zone - ip_id = response.data["id"] - record = Record.objects.get(ipam_ip_address=ip_id) - self.assertEqual(record.name, name) - self.assertEqual(record.zone.id, zone.id) - self.assertTrue(record.managed) - # Check value of dns_name - ip_address = IPAddress.objects.get(id=ip_id) - self.assertEqual(ip_address.dns_name, f"{name}.{zone.name}") - - @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) - def test_create_ip_existing_dns_record(self): - zone = self.zones[0] - name = "test-create-ip-existing-dns-record" - addr = "10.0.0.25/24" - - # Create DNS record - record = Record.objects.create( - name=name, - zone=zone, - type=RecordTypeChoices.A, - value=str(IPNetwork(addr).ip), - ) - - # Grant permissions to user - self.add_permissions("ipam.add_ipaddress") - self.add_permissions("netbox_dns.add_record") - - url = reverse("ipam-api:ipaddress-list") - data = { - "address": addr, - "custom_fields": { - "ipaddress_dns_zone_id": zone.id, - "ipaddress_dns_record_name": name, - }, - } - response = self.client.post(url, data, format="json", **self.header) - - self.assertTrue(status.is_success(response.status_code)) - - # Check if "record" has been linked to and is now managed - ip_address = IPAddress.objects.get(id=response.data["id"]) - record = Record.objects.get(ipam_ip_address=ip_address) - self.assertTrue(record.managed) - self.assertEqual(record.name, name) - self.assertEqual(record.zone, zone) - # Check value of dns_name - self.assertEqual(ip_address.dns_name, f"{name}.{zone.name}") - - @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) - def test_create_ip_missing_dns_permission(self): - zone = self.zones[0] - name = "test-create-ip-missing-dns-perm" - addr = "10.0.0.26/24" - - # Grant only IPAddress add permission to user, - # and *no* DNS record add permission - self.add_permissions("ipam.add_ipaddress") - - url = reverse("ipam-api:ipaddress-list") - data = { - "address": addr, - "custom_fields": { - "ipaddress_dns_zone_id": zone.id, - "ipaddress_dns_record_name": name, - }, - } - response = self.client.post(url, data, format="json", **self.header) - - # Should be denied - self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - # No IP address should have been created - self.assertFalse(IPAddress.objects.filter(address=addr).exists()) - # No DNS Record should have been created - self.assertFalse(Record.objects.filter(name=name, zone_id=zone.id).exists()) - - @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) - def test_delete_ip(self): - zone = self.zones[0] - name = "test-delete-ip" - addr = IPNetwork("10.0.0.27/24") - # Grant delete on both IP address and DNS record - self.add_permissions("ipam.delete_ipaddress") - self.add_permissions("netbox_dns.delete_record") - - # Create DNS record and IP Address - ip_address = IPAddress.objects.create( - address=addr, - dns_name=f"{name}.{zone.name}", - custom_field_data={ - "ipaddress_dns_record_name": name, - "ipaddress_dns_zone_id": zone.id, - }, - ) - record = Record.objects.create( - name=name, - zone=zone, - type=RecordTypeChoices.A, - value=str(addr.ip), - ipam_ip_address=ip_address, - ) - ptr_record_id = record.ptr_record.pk - - # Delete address - url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" - response = self.client.delete(url, **self.header) - # Check response - self.assertTrue(status.is_success(response.status_code)) - # Check if DNS record has been deleted - self.assertFalse(Record.objects.filter(id=record.id).exists()) - self.assertFalse(Record.objects.filter(id=ptr_record_id).exists()) - # Check if IP address has been deleted - self.assertFalse(IPAddress.objects.filter(id=ip_address.id).exists()) - - @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) - def test_modify_name_existing_ip(self): - addr = IPNetwork("10.0.0.27/24") - zone1 = self.zones[0] - zone2 = self.zones[1] - name = "test-modify-name-existing-ip" - newname = "newname" - - # Grant permissions to user - self.add_permissions("ipam.change_ipaddress") - self.add_permissions("netbox_dns.change_record") - - # Create IP Address - ip_address = IPAddress.objects.create( - address=addr, - dns_name=f"{name}.{zone1.name}", - custom_field_data={ - "ipaddress_dns_record_name": name, - "ipaddress_dns_zone_id": zone1.id, - }, - ) - # Create DNS record - Record.objects.create( - name=name, - zone=zone1, - type=RecordTypeChoices.A, - value=str(addr.ip), - ipam_ip_address=ip_address, - ) - - # Change name and zone - url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" - data = { - "custom_fields": { - "ipaddress_dns_record_name": newname, - "ipaddress_dns_zone_id": zone2.id, - } - } - response = self.client.patch(url, data, format="json", **self.header) - - # Check response - self.assertTrue(status.is_success(response.status_code)) - - # Check if ip still point to a record and if record matches - ip_address = IPAddress.objects.get(id=ip_address.id) - record_query = Record.objects.filter(ipam_ip_address=ip_address) - self.assertEqual(record_query.count(), 1) - self.assertEqual(record_query[0].name, newname) - self.assertEqual(record_query[0].zone, zone2) - # Check value of dns_name - self.assertEqual(ip_address.dns_name, f"{newname}.{zone2.name}") - - @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) - def test_modify_name_existing_ip_missing_dns_permission(self): - addr = IPNetwork("10.0.0.27/24") - zone1 = self.zones[0] - zone2 = self.zones[1] - name = "test-modify-name-existing-ip-no-perm" - - newname = "newname" - - # Grant permissions to user - self.add_permissions("ipam.change_ipaddress") - - # Create IP Address - ip_address = IPAddress.objects.create( - address=addr, - dns_name=f"{name}.{zone1.name}", - custom_field_data={ - "ipaddress_dns_record_name": name, - "ipaddress_dns_zone_id": zone1.id, - }, - ) - # Create DNS record - Record.objects.create( - name=name, - zone=zone1, - type=RecordTypeChoices.A, - value=str(addr.ip), - ipam_ip_address=ip_address, - ) - - # Change name and zone - url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" - data = { - "custom_fields": { - "ipaddress_dns_record_name": newname, - "ipaddress_dns_zone_id": zone2.id, - } - } - response = self.client.patch(url, data, format="json", **self.header) - - # Check response - self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - - # Check for no changes - ip_address = IPAddress.objects.get(id=ip_address.id) - record_query = Record.objects.filter(ipam_ip_address=ip_address) - self.assertEqual(record_query.count(), 1) - self.assertEqual(record_query[0].name, name) - self.assertEqual(record_query[0].zone, zone1) - # Check value of dns_name - self.assertEqual(ip_address.dns_name, f"{name}.{zone1.name}") - - @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) - def test_clear_name_existing_ip(self): - addr = IPNetwork("10.0.0.28/24") - zone = self.zones[0] - name = "test-clear-name-existing-ip" - - # Grant permissions to user - self.add_permissions("ipam.change_ipaddress") - self.add_permissions("netbox_dns.delete_record") - - # Create IP Address - ip_address = IPAddress.objects.create( - address=addr, - dns_name=f"{name}.{zone.name}", - custom_field_data={ - "ipaddress_dns_record_name": name, - "ipaddress_dns_zone_id": zone.id, - }, - ) - # Create DNS record - record = Record.objects.create( - name=name, - zone=zone, - type=RecordTypeChoices.A, - value=str(addr.ip), - ipam_ip_address=ip_address, - ) - ptr_record_id = record.ptr_record.pk - - url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" - data = {"custom_fields": {"ipaddress_dns_zone_id": None}} - response = self.client.patch(url, data, format="json", **self.header) - - # Check response - self.assertTrue(status.is_success(response.status_code)) - # Check if record has been deleted - self.assertFalse(Record.objects.filter(ipam_ip_address=ip_address).exists()) - self.assertFalse(Record.objects.filter(pk=ptr_record_id).exists()) - # Re-read IPAddress object - ip_address = IPAddress.objects.get(id=ip_address.id) - # Check if dns_name is empty - self.assertEqual(ip_address.dns_name, "") - cf_name = ip_address.custom_field_data.get("ipaddress_dns_record_name") - self.assertIsNone(cf_name) - - @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) - def test_rename_zone_existing_ip(self): - addr = IPNetwork("10.0.0.29/24") - zone = self.zones[0] - name = "test-rename-zone-existing-ip" - new_zone_name = "newzone.example.com" - - # Grant permissions to user - self.add_permissions("ipam.change_ipaddress") - self.add_permissions("netbox_dns.change_zone") - - # Create IP Address - ip_address = IPAddress.objects.create( - address=addr, - dns_name=f"{name}.{zone.name}", - custom_field_data={ - "ipaddress_dns_record_name": name, - "ipaddress_dns_zone_id": zone.id, - }, - ) - # Create DNS record - Record.objects.create( - name=name, - zone=zone, - type=RecordTypeChoices.A, - value=str(addr.ip), - ipam_ip_address=ip_address, - ) - - url = reverse("plugins-api:netbox_dns-api:zone-list") + str(zone.id) + "/" - data = {"name": new_zone_name} - response = self.client.patch(url, data, format="json", **self.header) - - # Check response - self.assertTrue(status.is_success(response.status_code)) - # Re-read IPAddress object - ip_address = IPAddress.objects.get(id=ip_address.id) - # Check if dns_name has correct value - self.assertEqual(ip_address.dns_name, f"{name}.{new_zone_name}") - # Check if record zone has correct name - record_query = Record.objects.filter(ipam_ip_address=ip_address) - self.assertEqual(record_query.count(), 1) - self.assertEqual(record_query[0].zone.name, new_zone_name) - - @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) - def test_delete_zone_existing_ip(self): - addr = IPNetwork("10.0.0.30/24") - zone = self.zones[0] - name = "test-delete-zone-existing-ip" - - # Create IP Address - ip_address = IPAddress.objects.create( - address=addr, - dns_name=f"{name}.{zone.name}", - custom_field_data={ - "ipaddress_dns_record_name": name, - "ipaddress_dns_zone_id": zone.id, - }, - ) - # Create DNS record - record = Record.objects.create( - name=name, - zone=zone, - type=RecordTypeChoices.A, - value=str(addr.ip), - ipam_ip_address=ip_address, - ) - - # Grant permissions to user - self.add_permissions("ipam.change_ipaddress") - self.add_permissions("netbox_dns.delete_zone") - self.add_permissions("netbox_dns.delete_record") - - url = reverse("plugins-api:netbox_dns-api:zone-list") + str(zone.id) + "/" - response = self.client.delete(url, **self.header) - - # Check response - self.assertTrue(status.is_success(response.status_code)) - # Check if record has been deleted - self.assertFalse(Record.objects.filter(id=record.id).exists()) - # Re-read IPAddress object - ip_address = IPAddress.objects.get(id=ip_address.id) - # Check if dns_name is empty - self.assertEqual(ip_address.dns_name, "") - # Check if custom field "ipaddress_dns_record_name" is empty - cf_name = ip_address.custom_field_data.get("ipaddress_dns_record_name") - self.assertIsNone(cf_name) diff --git a/netbox_dns/tests/ipam_coupling/__init__.py b/netbox_dns/tests/ipam_coupling/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/netbox_dns/tests/ipam_coupling/test_api.py b/netbox_dns/tests/ipam_coupling/test_api.py new file mode 100644 index 00000000..45f5d727 --- /dev/null +++ b/netbox_dns/tests/ipam_coupling/test_api.py @@ -0,0 +1,957 @@ +import requests + +from unittest import skip + +from django.urls import reverse +from django.test import override_settings +from django.core import management +from django.contrib.contenttypes.models import ContentType + +from rest_framework import status +from utilities.testing import APITestCase, disable_warnings + +from ipam.models import IPAddress +from netaddr import IPNetwork +from netbox_dns.models import Record, Zone, NameServer, RecordTypeChoices +from users.models import ObjectPermission + + +class IPAMCouplingAPITest(APITestCase): + zone_data = { + "default_ttl": 86400, + "soa_rname": "hostmaster.example.com", + "soa_refresh": 172800, + "soa_retry": 7200, + "soa_expire": 2592000, + "soa_ttl": 86400, + "soa_minimum": 3600, + "soa_serial": 1, + } + + @classmethod + def setUpTestData(cls): + cls.nameserver = NameServer.objects.create(name="ns1.example.com") + cls.zones = ( + Zone(name="zone1.example.com", **cls.zone_data, soa_mname=cls.nameserver), + Zone(name="zone2.example.com", **cls.zone_data, soa_mname=cls.nameserver), + Zone(name="0.0.10.in-addr.arpa", **cls.zone_data, soa_mname=cls.nameserver), + ) + for zone in cls.zones: + zone.save() + + # + # Add the required custom fields + # + management.call_command("setup_coupling") + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_create_ip_with_dns_permission(self): + zone = self.zones[0] + name = "name42" + address = "10.0.0.25/24" + + self.add_permissions("ipam.add_ipaddress") + self.add_permissions("netbox_dns.add_record") + + url = reverse("ipam-api:ipaddress-list") + data = { + "address": address, + "custom_fields": { + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": name, + }, + } + response = self.client.post(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_201_CREATED) + + ip_address = IPAddress.objects.get(pk=response.data["id"]) + address_record = ip_address.netbox_dns_records.first() + + self.assertEqual(address_record.name, name) + self.assertEqual(address_record.zone, zone) + self.assertTrue(address_record.managed) + + self.assertEqual(ip_address.dns_name, f"{name}.{zone.name}") + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_create_ip_missing_dns_permission(self): + zone = self.zones[0] + name = "name42" + address = "10.0.0.42/24" + + self.add_permissions("ipam.add_ipaddress") + + url = reverse("ipam-api:ipaddress-list") + data = { + "address": address, + "custom_fields": { + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": name, + }, + } + response = self.client.post(url, data, format="json", **self.header) + + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + self.assertFalse(IPAddress.objects.filter(address=address).exists()) + self.assertFalse(Record.objects.filter(name=name, zone_id=zone.id).exists()) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_delete_ip_with_dns_permission(self): + zone = self.zones[0] + name = "name23" + address = IPNetwork("10.0.0.23/24") + + self.add_permissions("ipam.delete_ipaddress") + self.add_permissions("netbox_dns.delete_record") + + ip_address = IPAddress.objects.create( + address=address, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + address_record = ip_address.netbox_dns_records.first() + ptr_record_id = address_record.ptr_record.pk + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + response = self.client.delete(url, **self.header) + + self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT) + self.assertFalse(IPAddress.objects.filter(pk=ip_address.pk).exists()) + self.assertFalse(Record.objects.filter(pk=address_record.pk).exists()) + self.assertFalse(Record.objects.filter(pk=ptr_record_id).exists()) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_delete_ip_with_dns_object_permission(self): + zone = self.zones[0] + name = "name23" + address = IPNetwork("10.0.0.23/24") + + self.add_permissions("ipam.delete_ipaddress") + + object_permission = ObjectPermission( + name=f"Delete Test Record", actions=["delete"], constraints={"name": name} + ) + object_permission.save() + object_permission.object_types.add(ContentType.objects.get_for_model(Record)) + object_permission.users.add(self.user) + + ip_address = IPAddress.objects.create( + address=address, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + address_record = ip_address.netbox_dns_records.first() + ptr_record_id = address_record.ptr_record.pk + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + response = self.client.delete(url, **self.header) + + self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT) + self.assertFalse(IPAddress.objects.filter(pk=ip_address.pk).exists()) + self.assertFalse(Record.objects.filter(pk=address_record.pk).exists()) + self.assertFalse(Record.objects.filter(pk=ptr_record_id).exists()) + + @skip("APIClient has problems handling exceptions") + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_delete_ip_missing_dns_permission(self): + zone = self.zones[0] + name = "name23" + address = IPNetwork("10.0.0.23/24") + + self.add_permissions("ipam.delete_ipaddress") + + ip_address = IPAddress.objects.create( + address=address, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + address_record = ip_address.netbox_dns_records.first() + ptr_record_id = address_record.ptr_record_id + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + response = self.client.delete(url, **self.header) + + self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN) + self.assertTrue(IPAddress.objects.filter(pk=ip_address.pk).exists()) + self.assertTrue(Record.objects.filter(pk=address_record.pk).exists()) + self.assertTrue(Record.objects.filter(pk=ptr_record_id).exists()) + + @skip("APIClient has problems handling exceptions") + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_delete_ip_missing_dns_object_permission(self): + zone = self.zones[0] + name = "name23" + address = IPNetwork("10.0.0.23/24") + + self.add_permissions("ipam.delete_ipaddress") + + object_permission = ObjectPermission( + name=f"Delete Test Record", + actions=["delete"], + constraints={"name": "whatever"}, + ) + object_permission.save() + object_permission.object_types.add(ContentType.objects.get_for_model(Record)) + object_permission.users.add(self.user) + + ip_address = IPAddress.objects.create( + address=address, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + address_record = ip_address.netbox_dns_records.first() + ptr_record_id = address_record.ptr_record_id + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + response = self.client.delete(url, **self.header) + + self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN) + self.assertTrue(IPAddress.objects.filter(pk=ip_address.pk).exists()) + self.assertTrue(Record.objects.filter(pk=address_record.pk).exists()) + self.assertTrue(Record.objects.filter(pk=ptr_record_id).exists()) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_modify_name_with_dns_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone = self.zones[0] + name1 = "name42" + name2 = "name23" + + self.add_permissions("ipam.change_ipaddress") + self.add_permissions("netbox_dns.change_record") + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name1, + "ipaddress_dns_zone_id": zone.id, + }, + ) + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_record_name": name2, + "ipaddress_dns_zone_id": zone.id, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_200_OK) + + ip_address.refresh_from_db() + + record_query = Record.objects.filter(ipam_ip_address=ip_address) + self.assertEqual(record_query.count(), 1) + self.assertEqual(record_query[0].name, name2) + self.assertEqual(record_query[0].zone, zone) + self.assertEqual(ip_address.dns_name, f"{name2}.{zone.name}") + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_modify_name_with_dns_object_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone = self.zones[0] + name1 = "name42" + name2 = "name23" + + self.add_permissions("ipam.change_ipaddress") + + object_permission = ObjectPermission( + name=f"Modify Test Record", actions=["change"], constraints={"name": name1} + ) + object_permission.save() + object_permission.object_types.add(ContentType.objects.get_for_model(Record)) + object_permission.users.add(self.user) + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name1, + "ipaddress_dns_zone_id": zone.id, + }, + ) + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_record_name": name2, + "ipaddress_dns_zone_id": zone.id, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_200_OK) + + ip_address.refresh_from_db() + + record_query = Record.objects.filter(ipam_ip_address=ip_address) + self.assertEqual(record_query.count(), 1) + self.assertEqual(record_query[0].name, name2) + self.assertEqual(record_query[0].zone, zone) + self.assertEqual(ip_address.dns_name, f"{name2}.{zone.name}") + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_modify_zone_with_dns_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone1 = self.zones[0] + zone2 = self.zones[1] + name = "name42" + + self.add_permissions("ipam.change_ipaddress") + self.add_permissions("netbox_dns.change_record") + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone1.id, + }, + ) + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone2.id, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_200_OK) + + ip_address.refresh_from_db() + + record_query = Record.objects.filter(ipam_ip_address=ip_address) + self.assertEqual(record_query.count(), 1) + self.assertEqual(record_query[0].name, name) + self.assertEqual(record_query[0].zone, zone2) + self.assertEqual(ip_address.dns_name, f"{name}.{zone2.name}") + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_modify_zone_with_dns_object_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone1 = self.zones[0] + zone2 = self.zones[1] + name = "name42" + + self.add_permissions("ipam.change_ipaddress") + + object_permission = ObjectPermission( + name=f"Modify Test Record", actions=["change"], constraints={"name": name} + ) + object_permission.save() + object_permission.object_types.add(ContentType.objects.get_for_model(Record)) + object_permission.users.add(self.user) + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone1.id, + }, + ) + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone2.id, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_200_OK) + + ip_address.refresh_from_db() + + record_query = Record.objects.filter(ipam_ip_address=ip_address) + self.assertEqual(record_query.count(), 1) + self.assertEqual(record_query[0].name, name) + self.assertEqual(record_query[0].zone, zone2) + self.assertEqual(ip_address.dns_name, f"{name}.{zone2.name}") + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_modify_name_missing_dns_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone = self.zones[0] + name1 = "name42" + name2 = "name23" + + self.add_permissions("ipam.change_ipaddress") + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name1, + "ipaddress_dns_zone_id": zone.id, + }, + ) + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_record_name": name2, + "ipaddress_dns_zone_id": zone.id, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + + ip_address.refresh_from_db() + + record_query = Record.objects.filter(ipam_ip_address=ip_address) + self.assertEqual(record_query.count(), 1) + self.assertEqual(record_query[0].name, name1) + self.assertEqual(record_query[0].zone, zone) + self.assertEqual(ip_address.dns_name, f"{name1}.{zone.name}") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_record_name": name1, + "ipaddress_dns_zone_id": zone.id, + }, + ) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_modify_name_missing_dns_object_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone = self.zones[0] + name1 = "name42" + name2 = "name23" + + self.add_permissions("ipam.change_ipaddress") + + object_permission = ObjectPermission( + name=f"Modify Test Record", + actions=["change"], + constraints={"name": "whatever"}, + ) + object_permission.save() + object_permission.object_types.add(ContentType.objects.get_for_model(Record)) + object_permission.users.add(self.user) + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name1, + "ipaddress_dns_zone_id": zone.id, + }, + ) + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_record_name": name2, + "ipaddress_dns_zone_id": zone.id, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + + ip_address.refresh_from_db() + + record_query = Record.objects.filter(ipam_ip_address=ip_address) + self.assertEqual(record_query.count(), 1) + self.assertEqual(record_query[0].name, name1) + self.assertEqual(record_query[0].zone, zone) + self.assertEqual(ip_address.dns_name, f"{name1}.{zone.name}") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_record_name": name1, + "ipaddress_dns_zone_id": zone.id, + }, + ) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_modify_zone_missing_dns_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone1 = self.zones[0] + zone2 = self.zones[1] + name = "name42" + + self.add_permissions("ipam.change_ipaddress") + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone1.id, + }, + ) + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone2.id, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + + ip_address.refresh_from_db() + + record_query = Record.objects.filter(ipam_ip_address=ip_address) + self.assertEqual(record_query.count(), 1) + self.assertEqual(record_query[0].name, name) + self.assertEqual(record_query[0].zone, zone1) + self.assertEqual(ip_address.dns_name, f"{name}.{zone1.name}") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone1.id, + }, + ) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_modify_zone_missing_dns_object_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone1 = self.zones[0] + zone2 = self.zones[1] + name = "name42" + + self.add_permissions("ipam.change_ipaddress") + + object_permission = ObjectPermission( + name=f"Modify Test Record", + actions=["change"], + constraints={"name": "whatever"}, + ) + object_permission.save() + object_permission.object_types.add(ContentType.objects.get_for_model(Record)) + object_permission.users.add(self.user) + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone1.id, + }, + ) + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone2.id, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + + ip_address.refresh_from_db() + + record_query = Record.objects.filter(ipam_ip_address=ip_address) + self.assertEqual(record_query.count(), 1) + self.assertEqual(record_query[0].name, name) + self.assertEqual(record_query[0].zone, zone1) + self.assertEqual(ip_address.dns_name, f"{name}.{zone1.name}") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone1.id, + }, + ) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_clear_name_with_dns_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone = self.zones[0] + name = "name42" + + self.add_permissions("ipam.change_ipaddress") + self.add_permissions("netbox_dns.delete_record") + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + ptr_record_id = ip_address.netbox_dns_records.first().ptr_record.pk + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": None, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_200_OK) + + self.assertFalse( + Record.objects.filter(ipam_ip_address=ip_address, managed=True).exists() + ) + self.assertFalse(Record.objects.filter(pk=ptr_record_id).exists()) + + ip_address.refresh_from_db() + + self.assertEqual(ip_address.dns_name, "") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_record_name": None, + "ipaddress_dns_zone_id": None, + }, + ) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_clear_name_with_dns_object_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone = self.zones[0] + name = "name42" + + self.add_permissions("ipam.change_ipaddress") + + object_permission = ObjectPermission( + name=f"Delete Test Record", actions=["delete"], constraints={"name": name} + ) + object_permission.save() + object_permission.object_types.add(ContentType.objects.get_for_model(Record)) + object_permission.users.add(self.user) + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + ptr_record_id = ip_address.netbox_dns_records.first().ptr_record.pk + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": None, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_200_OK) + + self.assertFalse( + Record.objects.filter(ipam_ip_address=ip_address, managed=True).exists() + ) + self.assertFalse(Record.objects.filter(pk=ptr_record_id).exists()) + + ip_address.refresh_from_db() + + self.assertEqual(ip_address.dns_name, "") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_record_name": None, + "ipaddress_dns_zone_id": None, + }, + ) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_clear_zone_with_dns_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone = self.zones[0] + name = "name42" + + self.add_permissions("ipam.change_ipaddress") + self.add_permissions("netbox_dns.delete_record") + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + ptr_record_id = ip_address.netbox_dns_records.first().ptr_record.pk + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_zone_id": None, + "ipaddress_dns_record_name": name, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_200_OK) + + self.assertFalse( + Record.objects.filter(ipam_ip_address=ip_address, managed=True).exists() + ) + self.assertFalse(Record.objects.filter(pk=ptr_record_id).exists()) + + ip_address.refresh_from_db() + + self.assertEqual(ip_address.dns_name, "") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_record_name": None, + "ipaddress_dns_zone_id": None, + }, + ) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_clear_zone_with_dns_object_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone = self.zones[0] + name = "name42" + + self.add_permissions("ipam.change_ipaddress") + + object_permission = ObjectPermission( + name=f"Delete Test Record", actions=["delete"], constraints={"name": name} + ) + object_permission.save() + object_permission.object_types.add(ContentType.objects.get_for_model(Record)) + object_permission.users.add(self.user) + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + ptr_record_id = ip_address.netbox_dns_records.first().ptr_record.pk + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_zone_id": None, + "ipaddress_dns_record_name": name, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_200_OK) + + self.assertFalse( + Record.objects.filter(ipam_ip_address=ip_address, managed=True).exists() + ) + self.assertFalse(Record.objects.filter(pk=ptr_record_id).exists()) + + ip_address.refresh_from_db() + + self.assertEqual(ip_address.dns_name, "") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_record_name": None, + "ipaddress_dns_zone_id": None, + }, + ) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_clear_name_missing_dns_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone = self.zones[0] + name = "name42" + + self.add_permissions("ipam.change_ipaddress") + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + ptr_record_id = ip_address.netbox_dns_records.first().ptr_record.pk + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": None, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + + self.assertTrue( + Record.objects.filter(ipam_ip_address=ip_address, managed=True).exists() + ) + self.assertTrue(Record.objects.filter(pk=ptr_record_id).exists()) + + ip_address.refresh_from_db() + + self.assertEqual(ip_address.dns_name, f"{name}.{zone.name}") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_clear_name_missing_dns_object_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone = self.zones[0] + name = "name42" + + self.add_permissions("ipam.change_ipaddress") + + object_permission = ObjectPermission( + name=f"Delete Test Record", + actions=["delete"], + constraints={"name": "whatever"}, + ) + object_permission.save() + object_permission.object_types.add(ContentType.objects.get_for_model(Record)) + object_permission.users.add(self.user) + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + ptr_record_id = ip_address.netbox_dns_records.first().ptr_record.pk + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": None, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + + self.assertTrue( + Record.objects.filter(ipam_ip_address=ip_address, managed=True).exists() + ) + self.assertTrue(Record.objects.filter(pk=ptr_record_id).exists()) + + ip_address.refresh_from_db() + + self.assertEqual(ip_address.dns_name, f"{name}.{zone.name}") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_clear_zone_missing_dns_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone = self.zones[0] + name = "name42" + + self.add_permissions("ipam.change_ipaddress") + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + ptr_record_id = ip_address.netbox_dns_records.first().ptr_record.pk + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_zone_id": None, + "ipaddress_dns_record_name": name, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + + self.assertTrue( + Record.objects.filter(ipam_ip_address=ip_address, managed=True).exists() + ) + self.assertTrue(Record.objects.filter(pk=ptr_record_id).exists()) + + ip_address.refresh_from_db() + + self.assertEqual(ip_address.dns_name, f"{name}.{zone.name}") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_clear_zone_missing_dns_object_permission(self): + addr = IPNetwork("10.0.0.42/24") + zone = self.zones[0] + name = "name42" + + self.add_permissions("ipam.change_ipaddress") + + object_permission = ObjectPermission( + name=f"Delete Test Record", + actions=["delete"], + constraints={"name": "whatever"}, + ) + object_permission.save() + object_permission.object_types.add(ContentType.objects.get_for_model(Record)) + object_permission.users.add(self.user) + + ip_address = IPAddress.objects.create( + address=addr, + custom_field_data={ + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) + ptr_record_id = ip_address.netbox_dns_records.first().ptr_record.pk + + url = reverse("ipam-api:ipaddress-list") + str(ip_address.id) + "/" + data = { + "custom_fields": { + "ipaddress_dns_zone_id": None, + "ipaddress_dns_record_name": name, + } + } + response = self.client.patch(url, data, format="json", **self.header) + + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + + self.assertTrue( + Record.objects.filter(ipam_ip_address=ip_address, managed=True).exists() + ) + self.assertTrue(Record.objects.filter(pk=ptr_record_id).exists()) + + ip_address.refresh_from_db() + + self.assertEqual(ip_address.dns_name, f"{name}.{zone.name}") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_record_name": name, + "ipaddress_dns_zone_id": zone.id, + }, + ) diff --git a/netbox_dns/tests/ipam_coupling/test_records.py b/netbox_dns/tests/ipam_coupling/test_records.py new file mode 100644 index 00000000..bff3469b --- /dev/null +++ b/netbox_dns/tests/ipam_coupling/test_records.py @@ -0,0 +1,301 @@ +from django.test import TestCase, override_settings +from django.core import management +from django.core.exceptions import ValidationError + +from ipam.models import IPAddress +from netaddr import IPNetwork +from netbox_dns.models import Record, Zone, NameServer, RecordTypeChoices + + +class IPAMCouplingRecordTest(TestCase): + zone_data = { + "default_ttl": 86400, + "soa_rname": "hostmaster.example.com", + "soa_refresh": 172800, + "soa_retry": 7200, + "soa_expire": 2592000, + "soa_ttl": 86400, + "soa_minimum": 3600, + "soa_serial": 1, + } + + @classmethod + def setUpTestData(cls): + cls.nameserver = NameServer.objects.create(name="ns1.example.com") + cls.zones = ( + Zone(name="zone1.example.com", **cls.zone_data, soa_mname=cls.nameserver), + Zone(name="zone2.example.com", **cls.zone_data, soa_mname=cls.nameserver), + Zone(name="0.0.10.in-addr.arpa", **cls.zone_data, soa_mname=cls.nameserver), + ) + for zone in cls.zones: + zone.save() + + # + # Add the required custom fields + # + management.call_command("setup_coupling") + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_create_ip(self): + zone = self.zones[0] + name = "name42" + address = IPNetwork("10.0.0.42/24") + + ip_address = IPAddress( + address=address, + custom_field_data={ + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": name, + }, + ) + ip_address.save() + + record = Record.objects.get(ipam_ip_address=ip_address) + self.assertEqual(record.name, name) + self.assertEqual(record.zone, zone) + self.assertTrue(record.managed) + + self.assertEqual(ip_address.dns_name, f"{name}.{zone.name}") + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_create_ip_existing_dns_record(self): + zone = self.zones[0] + name = "name42" + address = IPNetwork("10.0.0.42/24") + + unmanaged_address_record = Record.objects.create( + name=name, + zone=zone, + type=RecordTypeChoices.A, + value=str(address.ip), + ) + + ip_address = IPAddress.objects.create( + address=address, + custom_field_data={ + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": name, + }, + ) + + managed_address_record = Record.objects.get(ipam_ip_address=ip_address) + self.assertTrue(managed_address_record.managed) + self.assertEqual(managed_address_record.name, name) + self.assertEqual(managed_address_record.zone, zone) + + self.assertEqual(ip_address.dns_name, f"{name}.{zone.name}") + + self.assertFalse(unmanaged_address_record.managed) + self.assertEqual(unmanaged_address_record.name, name) + self.assertEqual(unmanaged_address_record.zone, zone) + self.assertNotEqual(unmanaged_address_record.pk, managed_address_record.pk) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_create_ip_invalid_name(self): + zone = self.zones[0] + name = "aa--name42" + address = IPNetwork("10.0.0.42/24") + + ip_address = IPAddress( + address=address, + custom_field_data={ + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": name, + }, + ) + with self.assertRaises(ValidationError): + ip_address.save() + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_delete_ip(self): + zone = self.zones[0] + name = "name23" + address = IPNetwork("10.0.0.23/24") + + ip_address = IPAddress.objects.create( + address=address, + custom_field_data={ + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": name, + }, + ) + address_record = ip_address.netbox_dns_records.first() + ptr_record_id = address_record.ptr_record.id + + ip_address.delete() + + self.assertFalse(IPAddress.objects.filter(id=ip_address.id).exists()) + self.assertFalse(Record.objects.filter(id=address_record.id).exists()) + self.assertFalse(Record.objects.filter(id=ptr_record_id).exists()) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_modify_name_cf(self): + address = IPNetwork("10.0.0.42/24") + zone = self.zones[0] + name1 = "test42" + name2 = "test23" + + ip_address = IPAddress.objects.create( + address=address, + custom_field_data={ + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": name1, + }, + ) + + ip_address.custom_field_data = { + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": name2, + } + ip_address.save() + + record_query = Record.objects.filter(ipam_ip_address=ip_address) + self.assertEqual(record_query.count(), 1) + self.assertEqual(record_query[0].name, name2) + self.assertEqual(record_query[0].zone, zone) + self.assertEqual(ip_address.dns_name, f"{name2}.{zone.name}") + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_modify_zone_cf(self): + address = IPNetwork("10.0.0.42/24") + zone1 = self.zones[0] + zone2 = self.zones[1] + name = "test42" + + ip_address = IPAddress.objects.create( + address=address, + custom_field_data={ + "ipaddress_dns_zone_id": zone1.id, + "ipaddress_dns_record_name": name, + }, + ) + + ip_address.custom_field_data = { + "ipaddress_dns_zone_id": zone2.id, + "ipaddress_dns_record_name": name, + } + ip_address.save() + + record_query = Record.objects.filter(ipam_ip_address=ip_address) + self.assertEqual(record_query.count(), 1) + self.assertEqual(record_query[0].name, name) + self.assertEqual(record_query[0].zone, zone2) + self.assertEqual(ip_address.dns_name, f"{name}.{zone2.name}") + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_clear_name_cf(self): + address = IPNetwork("10.0.0.27/24") + zone = self.zones[0] + name = "test42" + + ip_address = IPAddress.objects.create( + address=address, + custom_field_data={ + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": name, + }, + ) + self.assertTrue(Record.objects.filter(ipam_ip_address=ip_address).exists()) + + ip_address.custom_field_data = { + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": None, + } + ip_address.save() + + record_query = Record.objects.filter(ipam_ip_address=ip_address) + self.assertFalse(Record.objects.filter(ipam_ip_address=ip_address).exists()) + self.assertFalse( + Record.objects.filter(name=name, zone=zone, managed=True).exists() + ) + self.assertEqual(ip_address.dns_name, "") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_zone_id": None, + "ipaddress_dns_record_name": None, + }, + ) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_clear_zone_cf(self): + address = IPNetwork("10.0.0.27/24") + zone = self.zones[0] + name = "test42" + + ip_address = IPAddress.objects.create( + address=address, + custom_field_data={ + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": name, + }, + ) + self.assertTrue(Record.objects.filter(ipam_ip_address=ip_address).exists()) + + ip_address.custom_field_data = { + "ipaddress_dns_zone_id": None, + "ipaddress_dns_record_name": name, + } + ip_address.save() + + record_query = Record.objects.filter(ipam_ip_address=ip_address) + self.assertFalse(Record.objects.filter(ipam_ip_address=ip_address).exists()) + self.assertFalse( + Record.objects.filter(name=name, zone=zone, managed=True).exists() + ) + self.assertEqual(ip_address.dns_name, "") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_zone_id": None, + "ipaddress_dns_record_name": None, + }, + ) + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_rename_zone(self): + address = IPNetwork("10.0.0.27/24") + zone = self.zones[0] + name = "test42" + new_zone_name = "zone3.example.com" + + ip_address = IPAddress.objects.create( + address=address, + custom_field_data={ + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": name, + }, + ) + + zone.name = new_zone_name + zone.save() + + ip_address.refresh_from_db() + self.assertEqual(ip_address.dns_name, f"{name}.{new_zone_name}") + + @override_settings(PLUGINS_CONFIG={"netbox_dns": {"feature_ipam_coupling": True}}) + def test_delete_zone(self): + address = IPNetwork("10.0.0.27/24") + zone = self.zones[0] + name = "test42" + + ip_address = IPAddress.objects.create( + address=address, + custom_field_data={ + "ipaddress_dns_zone_id": zone.id, + "ipaddress_dns_record_name": name, + }, + ) + + zone.delete() + + ip_address.refresh_from_db() + self.assertEqual(ip_address.netbox_dns_records.count(), 0) + self.assertEqual(ip_address.dns_name, f"") + self.assertEqual( + ip_address.custom_field_data, + { + "ipaddress_dns_zone_id": None, + "ipaddress_dns_record_name": None, + }, + ) diff --git a/netbox_dns/utilities.py b/netbox_dns/utilities/__init__.py similarity index 100% rename from netbox_dns/utilities.py rename to netbox_dns/utilities/__init__.py diff --git a/netbox_dns/utilities/ipam_coupling.py b/netbox_dns/utilities/ipam_coupling.py new file mode 100644 index 00000000..e754d09e --- /dev/null +++ b/netbox_dns/utilities/ipam_coupling.py @@ -0,0 +1,87 @@ +from ipam.choices import IPAddressStatusChoices +from utilities.permissions import resolve_permission + +from netbox_dns.models import Record, RecordTypeChoices, RecordStatusChoices + + +class DNSPermissionDenied(Exception): + pass + + +def ipaddress_cf_data(ip_address): + name = ip_address.custom_field_data.get("ipaddress_dns_record_name") + zone_id = ip_address.custom_field_data.get("ipaddress_dns_zone_id") + + if name is None or zone_id is None: + return None, None + + return name, zone_id + + +def address_record_type(ip_address): + return RecordTypeChoices.AAAA if ip_address.family == 6 else RecordTypeChoices.A + + +def address_record_status(ip_address): + return ( + RecordStatusChoices.STATUS_ACTIVE + if ip_address.status == IPAddressStatusChoices.STATUS_ACTIVE + else RecordStatusChoices.STATUS_INACTIVE + ) + + +def get_address_record(ip_address): + return ip_address.netbox_dns_records.first() + + +def new_address_record(instance): + name, zone_id = ipaddress_cf_data(instance) + + if zone_id is None: + return None + + return Record( + name=name, + zone_id=zone_id, + status=address_record_status(instance), + type=address_record_type(instance), + value=str(instance.address.ip), + ipam_ip_address_id=instance.id, + managed=True, + ) + + +def update_address_record(record, ip_address): + name, zone_id = ipaddress_cf_data(ip_address) + + record.name = name + record.zone_id = zone_id + record.status = address_record_status(ip_address) + record.value = str(ip_address.address.ip) + + +def check_permission(request, permission, record=None): + if record is not None and record.pk is None: + check_record = None + else: + check_record = record + + user = request.user + + if not user.has_perm(permission, check_record): + action = resolve_permission(permission)[1] + item = "records" if check_record is None else f"record {check_record}" + + raise DNSPermissionDenied(f"User {user} is not allowed to {action} DNS {item}") + + +def dns_changed(old, new): + return any( + ( + old.address.ip != new.address.ip, + old.custom_field_data["ipaddress_dns_record_name"] + != new.custom_field_data["ipaddress_dns_record_name"], + old.custom_field_data["ipaddress_dns_zone_id"] + != new.custom_field_data["ipaddress_dns_zone_id"], + ) + )