Skip to content

Commit

Permalink
Merge pull request #79 from peteeckel/fix/error-handling-in-middleware
Browse files Browse the repository at this point in the history
Fixed DNS exception handling for IP addresses with coupled DNS records
  • Loading branch information
peteeckel authored Oct 20, 2023
2 parents fdb2e5e + 116967f commit f460a47
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 165 deletions.
289 changes: 159 additions & 130 deletions netbox_dns/middleware.py
Original file line number Diff line number Diff line change
@@ -1,117 +1,181 @@
from django.db import transaction
from django.db.models import signals
from django.core.exceptions import MiddlewareNotUsed, PermissionDenied
from django.core.exceptions import MiddlewareNotUsed, PermissionDenied, ValidationError

from ipam.models import IPAddress
from ipam.choices import IPAddressStatusChoices
from extras.plugins import get_plugin_config
from netbox_dns.models import Zone, Record, RecordTypeChoices
from utilities.exceptions import PermissionsViolation, AbortRequest
from netbox.signals import post_clean
from utilities.permissions import resolve_permission

from netbox_dns.models import Zone, Record, RecordTypeChoices, RecordStatusChoices


def address_record_cf_data(ip_address):
name = ip_address.custom_field_data.get("ipaddress_dns_record_name")
zone_id = ip_address.custom_field_data.get("ipaddress_dns_zone_id")

if name is None or zone_id is None:
return None, None

return name, zone_id


def address_record_type(ip_address):
return RecordTypeChoices.AAAA if ip_address.family == 6 else RecordTypeChoices.A


def address_record_status(ip_address):
return (
RecordStatusChoices.STATUS_ACTIVE
if ip_address.status == IPAddressStatusChoices.STATUS_ACTIVE
else RecordStatusChoices.STATUS_INACTIVE
)


def new_address_record(ip_address):
name, zone_id = address_record_cf_data(ip_address)

if zone_id is None:
return None

return Record(
name=name,
zone_id=zone_id,
status=address_record_status(ip_address),
type=address_record_type(ip_address),
value=str(ip_address.address.ip),
ipam_ip_address_id=ip_address.id,
managed=True,
)


def get_address_record(ip_address):
return ip_address.netbox_dns_records.first()


def update_address_record(record, ip_address):
name, zone_id = address_record_cf_data(ip_address)

record.name = name
record.zone_id = zone_id
record.status = address_record_status(ip_address)
record.value = str(ip_address.address.ip)


def check_address_record(user, record, permission):
action = resolve_permission(permission)[1]

if not user.has_perm(permission):
raise PermissionDenied()

with transaction.atomic():
if action in {"add", "change"}:
record.save()

queryset = Record.objects.restrict(user=user, action=action)
if not queryset.filter(pk=record.pk).exists():
raise PermissionDenied()

transaction.set_rollback(True)


class Action:
def __init__(self, request):
self.request = request

def post_clean(self, sender, **kwargs):
ip_address = kwargs.get("instance")
user = self.request.user

try:
if ip_address.id is None:
#
# Handle new IP addresses
#
record = new_address_record(ip_address)
if record is not None:
check_address_record(user, record, "netbox_dns.add_record")
else:
#
# Handle updates to existing IP addresses
#
record = get_address_record(ip_address)
if record is not None:
#
# Update or delete the existing address record
#
name, zone_id = address_record_cf_data(ip_address)
if zone_id is not None:
#
# Update the address record
#
update_address_record(record, ip_address)
check_address_record(user, record, "netbox_dns.change_record")
else:
#
# Delete the address record
#
check_address_record(user, record, "netbox_dns.delete_record")
else:
#
# Create a new address record
#
record = new_address_record(ip_address)
if record is not None:
check_address_record(user, record, "netbox_dns.add_record")

except ValidationError as exc:
value = exc.error_dict.pop("name", None)
if value is not None:
exc.error_dict["cf_ipaddress_dns_record_name"] = value

value = exc.error_dict.pop("value", None)
if value is not None:
exc.error_dict["cf_ipaddress_dns_record_name"] = value

raise ValidationError(exc)

#
# Check permission to create DNS record before IP address creation
# NB: If IP address is created *before* DNS record is allowed it's too late
# → permission check must be done at pre-save, and an exception
# must be raised to prevent IP creation.
# Update DNS related fields according to the contents of the IPAM-DNS
# coupling custom fields.
#
def pre_save(self, sender, **kwargs):
if kwargs.get("update_fields"):
return

ip_address = kwargs.get("instance")
name = ip_address.custom_field_data.get("ipaddress_dns_record_name")
zone_id = ip_address.custom_field_data.get("ipaddress_dns_zone_id")

# Handle new IPAddress objects only; name and zone must both be defined
if ip_address.id is None and name is not None and zone_id is not None:
zone = Zone.objects.get(id=zone_id)
type = (
RecordTypeChoices.AAAA
if ip_address.family == 6
else RecordTypeChoices.A
)
value = str(ip_address.address.ip)

# Create a DNS record *without saving* in order to check permissions
record = Record(name=name, zone=zone, type=type, value=value)
user = self.request.user
check_record_permission(user, record, "netbox_dns.add_record")
name, zone_id = address_record_cf_data(ip_address)

if zone_id is not None:
ip_address.dns_name = f"{name}.{Zone.objects.get(pk=zone_id).name}"
else:
ip_address.dns_name = ""
ip_address.custom_field_data["ipaddress_dns_record_name"] = None
ip_address.custom_field_data["ipaddress_dns_zone_id"] = None

#
# Handle DNS record operation after IPAddress has been created or modified
#
def post_save(self, sender, **kwargs):
# Do not process specific field update (eg. dns_hostname modify)
if kwargs.get("update_fields"):
return

ip_address = kwargs.get("instance")
user = self.request.user
name = ip_address.custom_field_data.get("ipaddress_dns_record_name")
zone_id = ip_address.custom_field_data.get("ipaddress_dns_zone_id")
zone = Zone.objects.get(id=zone_id) if zone_id is not None else None

# Clear the other field if one is empty, which is inconsistent
if name is None:
zone = None
elif zone is None:
name = None

# Delete the DNS record because name and zone have been removed
if zone is None:
# Find the record pointing to this IP Address
for record in ip_address.netbox_dns_records.all():
# If permission ok, clear all fields related to DNS
check_record_permission(user, record, "netbox_dns.delete_record")

ip_address.dns_name = ""
ip_address.custom_field_data["ipaddress_dns_record_name"] = ""
ip_address.save(update_fields=["custom_field_data", "dns_name"])

record.delete()

# Modify or add the DNS record
else:
type = (
RecordTypeChoices.AAAA
if ip_address.family == 6
else RecordTypeChoices.A
)

# If DNS record already point to this IP, modify it
record = ip_address.netbox_dns_records.first()
if record is not None:
record.name = name
record.zone = zone
record.value = str(ip_address.address.ip)
record.type = type

check_record_permission(user, record, "netbox_dns.change_record")
record.save()
name, zone_id = address_record_cf_data(ip_address)

if zone_id is None:
#
# Name/Zone CF data has been removed: Remove the DNS address record
#
ip_address.netbox_dns_records.all().delete()

else:
#
# Name/Zone CF data is present: Check for a DNS address record
# and add or modify it as necessary
#
record = get_address_record(ip_address)
if record is None:
record = new_address_record(ip_address)
else:
# Create a new record
record = Record(
name=name,
zone=zone,
type=type,
value=str(ip_address.address.ip),
ipam_ip_address=ip_address,
managed=True,
)

check_record_permission(
user, record, "netbox_dns.add_record", commit=True
)

# Update the dns_name field with FQDN
ip_address.dns_name = record.fqdn.rstrip(".")
ip_address.save(update_fields=["dns_name"])
update_address_record(record, ip_address)

record.save()

#
# Delete DNS record before deleting IP address
Expand All @@ -121,49 +185,11 @@ def pre_delete(self, sender, **kwargs):

for record in ip_address.netbox_dns_records.all():
user = self.request.user
check_record_permission(user, record, "netbox_dns.delete_record")
check_address_record(user, record, "netbox_dns.delete_record")

record.delete()


#
# Filter through permissions. Simulate adding the record in the "add" case.
# NB: Side-effect if "commit" is set to True → the DNS record is created.
# This is necessary to avoid the cascading effects of PTR creation.
#
def check_record_permission(user, record, perm, commit=False):
# Check that the user has been granted the required permission(s).
action = resolve_permission(perm)[1]

if not user.has_perm(perm):
raise PermissionDenied()

try:
with transaction.atomic():
# Save record when adding
# Rollback is done at the end of the transaction, unless committed

if action == "add":
record.save()

# Update the view's QuerySet to filter only the permitted objects
queryset = Record.objects.restrict(user, action)
# Check that record conforms to permissions
# → must be included in the restricted queryset
if not queryset.filter(pk=record.pk).exists():
raise PermissionDenied()

if not commit:
raise AbortRequest("Normal Exit")

# Catch "Normal Exit" without modification, rollback transaction
except AbortRequest as exc:
pass

except Exception as exc:
raise exc


class IpamCouplingMiddleware:
def __init__(self, get_response):
if not get_plugin_config("netbox_dns", "feature_ipam_coupling"):
Expand All @@ -172,9 +198,12 @@ def __init__(self, get_response):
self.get_response = get_response

def __call__(self, request):
# connect signals to actions
#
# Connect signals to actions
#
action = Action(request)
connections = [
(post_clean, action.post_clean),
(signals.pre_save, action.pre_save),
(signals.post_save, action.post_save),
(signals.pre_delete, action.pre_delete),
Expand Down
2 changes: 1 addition & 1 deletion netbox_dns/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ def delete(self, *args, **kwargs):
custom_field_data__ipaddress_dns_zone_id=self.pk
):
ip.dns_name = ""
ip.custom_field_data["ipaddress_dns_record_name"] = ""
ip.custom_field_data["ipaddress_dns_record_name"] = None
ip.custom_field_data["ipaddress_dns_zone_id"] = None
ip.save(update_fields=["dns_name", "custom_field_data"])

Expand Down
Loading

0 comments on commit f460a47

Please sign in to comment.