Skip to content

Commit

Permalink
Merge pull request #589 from nautobot/feat-dnac_hostname_mapping
Browse files Browse the repository at this point in the history
Re-add Hostname Mapping Feature to DNA Center
  • Loading branch information
jdrew82 authored Oct 30, 2024
2 parents 17b9959 + f9925d6 commit d3f5a7a
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 117 deletions.
1 change: 1 addition & 0 deletions changes/588.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added support for Software Version object in DNA Center integration.
1 change: 1 addition & 0 deletions changes/588.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed hostname mapping functionality in DNA Center integration. It is now available in the Job form.
1 change: 1 addition & 0 deletions changes/588.removed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Removed use of OS Version CustomField in DNA Center integration. Now uses Software Version from Nautobot 2.2 and/or Device Lifecycle Management SoftwareLCM object if found.
Binary file modified docs/images/dnac_job_form.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
131 changes: 63 additions & 68 deletions nautobot_ssot/integrations/dna_center/diffsync/adapters/dna_center.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Nautobot SSoT for Cisco DNA Center Adapter for DNA Center SSoT plugin."""

import json
from typing import List
from typing import List, Optional

from diffsync import Adapter
from diffsync.exceptions import ObjectNotFound
Expand Down Expand Up @@ -64,7 +64,6 @@ def load_locations(self):
# to ensure we process locations in the appropriate order we need to split them into their own list of locations
self.dnac_location_map = self.build_dnac_location_map(locations)
areas, buildings, floors = self.parse_and_sort_locations(locations)
self.load_areas(areas)
self.load_buildings(buildings)
self.load_floors(floors)
else:
Expand Down Expand Up @@ -149,34 +148,14 @@ def load_controller_locations(self):
attrs={"uuid": None},
)

def load_areas(self, areas: List[dict]):
"""Load areas from DNAC into DiffSync model.
def load_area(self, area: str, area_parent: Optional[str] = None):
"""Load area from DNAC into DiffSync model.
Args:
areas (List[dict]): List of dictionaries containing location information about a building.
area (str): Name of area to be loaded.
area_parent (Optional[str], optional): Name of area's parent if defined. Defaults to None.
"""
for location in areas:
if not settings.PLUGINS_CONFIG["nautobot_ssot"].get("dna_center_import_global"):
if location["name"] == "Global":
continue
parent_name = None
if location.get("parentId") and location["parentId"] in self.dnac_location_map:
parent_name = self.dnac_location_map[location["parentId"]]["name"]
self.dnac_location_map[location["id"]]["parent"] = parent_name
_, loaded = self.get_or_instantiate(
self.area,
ids={"name": location["name"], "parent": parent_name},
attrs={
"uuid": None,
},
)
if loaded:
if self.job.debug:
self.job.logger.info(f"Loaded {self.job.area_loctype.name} {location['name']}. {location}")
else:
self.job.logger.warning(
f"Duplicate {self.job.area_loctype.name} {location['name']} attempting to be loaded."
)
self.get_or_instantiate(self.area, ids={"name": area, "parent": area_parent}, attrs={"uuid": None})

def load_buildings(self, buildings: List[dict]):
"""Load building data from DNAC into DiffSync model.
Expand All @@ -185,35 +164,44 @@ def load_buildings(self, buildings: List[dict]):
buildings (List[dict]): List of dictionaries containing location information about a building.
"""
for location in buildings:
if location["parentId"] in self.dnac_location_map:
_area = self.dnac_location_map[location["parentId"]]
else:
_area = {"name": "Global", "parent": None}
try:
self.get(self.building, {"name": location["name"], "area": _area["name"]})
self.job.logger.warning(
f"{self.job.building_loctype.name} {location['name']} already loaded so skipping."
)
continue
except ObjectNotFound:
if self.job.debug:
self.job.logger.info(f"Loading {self.job.building_loctype.name} {location['name']}. {location}")
address, _ = self.conn.find_address_and_type(info=location["additionalInfo"])
latitude, longitude = self.conn.find_latitude_and_longitude(info=location["additionalInfo"])
new_building = self.building(
name=location["name"],
address=address if address else "",
area=_area["name"],
area_parent=_area["parent"],
latitude=latitude[:9].rstrip("0"),
longitude=longitude[:7].rstrip("0"),
tenant=self.tenant.name if self.tenant else None,
uuid=None,
)
try:
self.add(new_building)
except ValidationError as err:
self.job.logger.warning(f"Unable to load building {location['name']}. {err}")
if self.job.debug:
self.job.logger.info(f"Loading {self.job.building_loctype.name} {location['name']}. {location}")
bldg_name = location["name"]
_area, _area_parent = None, None
if bldg_name in self.job.location_map and "parent" in self.job.location_map[bldg_name]:
_area = self.job.location_map[bldg_name]["parent"]
if "area_parent" in self.job.location_map[bldg_name]:
_area_parent = self.job.location_map[bldg_name]["area_parent"]
elif location["parentId"] in self.dnac_location_map:
_area = self.dnac_location_map[location["parentId"]]["name"]
_area_parent = self.dnac_location_map[location["parentId"]]["parent"]
if _area in self.job.location_map and (
"parent" in self.job.location_map[_area] and bldg_name not in self.job.location_map
):
_area_parent = self.job.location_map[_area]["parent"]
if not settings.PLUGINS_CONFIG["nautobot_ssot"].get("dna_center_import_global"):
if _area == "Global":
_area = None
if _area_parent == "Global":
_area_parent = None
if _area:
self.load_area(area=_area, area_parent=_area_parent)
address, _ = self.conn.find_address_and_type(info=location["additionalInfo"])
latitude, longitude = self.conn.find_latitude_and_longitude(info=location["additionalInfo"])
_, loaded = self.get_or_instantiate(
self.building,
ids={"name": bldg_name, "area": _area},
attrs={
"address": address if address else "",
"area_parent": _area_parent,
"latitude": latitude[:9].rstrip("0"),
"longitude": longitude[:7].rstrip("0"),
"tenant": self.tenant.name if self.tenant else None,
"uuid": None,
},
)
if not loaded:
self.job.logger.warning(f"{self.job.building_loctype.name} {bldg_name} already loaded so skipping.")

def load_floors(self, floors: List[dict]):
"""Load floor data from DNAC into DiffSync model.
Expand Down Expand Up @@ -311,19 +299,25 @@ def load_devices(self):
"""Load Device data from DNA Center info DiffSync models."""
devices = self.conn.get_devices()
for dev in devices:
if not PLUGIN_CFG.get("dna_center_import_merakis") and (
(dev.get("family") and "Meraki" in dev["family"])
or (dev.get("errorDescription") and "Meraki" in dev["errorDescription"])
):
continue
platform = "unknown"
dev_role = "Unknown"
vendor = "Cisco"
if not dev.get("hostname"):
self.job.logger.warning(f"Device {dev['id']} is missing hostname so will be skipped.")
if self.job.debug:
self.job.logger.warning(f"Device {dev['id']} is missing hostname so will be skipped.")
dev["field_validation"] = {
"reason": "Failed due to missing hostname.",
}
self.failed_import_devices.append(dev)
continue
if PLUGIN_CFG.get("dna_center_hostname_mapping"):
if self.job.hostname_map:
dev_role = self.conn.parse_hostname_for_role(
hostname_map=PLUGIN_CFG["dna_center_hostname_mapping"], device_hostname=dev["hostname"]
hostname_map=self.job.hostname_map, device_hostname=dev["hostname"]
)
if dev_role == "Unknown":
dev_role = dev["role"]
Expand All @@ -333,8 +327,6 @@ def load_devices(self):
if not dev.get("softwareType") and dev.get("type") and ("3800" in dev["type"] or "9130" in dev["type"]):
platform = "cisco_ios"
if not dev.get("softwareType") and dev.get("family") and "Meraki" in dev["family"]:
if not PLUGIN_CFG.get("dna_center_import_merakis"):
continue
platform = "cisco_meraki"
if dev.get("type") and "Juniper" in dev["type"]:
vendor = "Juniper"
Expand All @@ -349,7 +341,8 @@ def load_devices(self):
or loc_data.get("building") == "Unassigned"
or not loc_data.get("building")
):
self.job.logger.warning(f"Device {dev['hostname']} is missing building so will not be imported.")
if self.job.debug:
self.job.logger.warning(f"Device {dev['hostname']} is missing building so will not be imported.")
dev["field_validation"] = {
"reason": "Missing building assignment.",
"device_details": dev_details,
Expand All @@ -364,9 +357,10 @@ def load_devices(self):
)
device_found = self.get(self.device, dev["hostname"])
if device_found:
self.job.logger.warning(
f"Duplicate device attempting to be loaded for {dev['hostname']} with ID: {dev['id']} so will not be imported."
)
if self.job.debug:
self.job.logger.warning(
f"Duplicate device attempting to be loaded for {dev['hostname']} with ID: {dev['id']} so will not be imported."
)
dev["field_validation"] = {
"reason": "Failed due to duplicate device found.",
"device_details": dev_details,
Expand Down Expand Up @@ -394,7 +388,8 @@ def load_devices(self):
self.add(new_dev)
self.load_ports(device_id=dev["id"], dev=new_dev, mgmt_addr=dev["managementIpAddress"])
except ValidationError as err:
self.job.logger.warning(f"Unable to load device {dev['hostname']}. {err}")
if self.job.debug:
self.job.logger.warning(f"Unable to load device {dev['hostname']}. {err}")
dev["field_validation"] = {
"reason": f"Failed validation. {err}",
"device_details": dev_details,
Expand All @@ -421,7 +416,7 @@ def load_ports(self, device_id: str, dev: DnaCenterDevice, mgmt_addr: str = ""):
"mac_addr": port["macAddress"].upper() if port.get("macAddress") else None,
},
)
if found_port:
if found_port and self.job.debug:
self.job.logger.warning(
f"Duplicate port attempting to be loaded, {port['portName']} for {dev.name}"
)
Expand Down Expand Up @@ -495,7 +490,7 @@ def load_ip_address(self, host: str, mask_length: int, prefix: str):
self.add(new_prefix)
try:
ip_found = self.get(self.ipaddress, {"host": host, "namespace": namespace})
if ip_found:
if ip_found and self.job.debug:
self.job.logger.warning(f"Duplicate IP Address attempting to be loaded: {host} in {prefix}")
except ObjectNotFound:
if self.job.debug:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,22 @@ def load_devices(self):
devices = OrmDevice.objects.filter(_custom_field_data__system_of_record="DNA Center")
for dev in devices:
self.device_map[dev.name] = dev.id
version = dev.custom_field_data.get("os_version")
version = None
if getattr(dev, "software_version"):
version = dev.software_version.version
if LIFECYCLE_MGMT:
dlm_version = None
try:
soft_lcm = OrmRelationship.objects.get(label="Software on Device")
version = OrmRelationshipAssociation.objects.get(
dlm_version = OrmRelationshipAssociation.objects.get(
relationship=soft_lcm, destination_id=dev.id
).source.version
except OrmRelationship.DoesNotExist:
pass
except OrmRelationshipAssociation.DoesNotExist:
pass
if dlm_version != version:
version = None
new_dev = self.device(
name=dev.name,
status=dev.status.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Building(DiffSyncModel):

name: str
address: Optional[str] = None
area: str
area: Optional[str] = None
area_parent: Optional[str] = None
latitude: Optional[str] = None
longitude: Optional[str] = None
Expand Down
53 changes: 41 additions & 12 deletions nautobot_ssot/integrations/dna_center/diffsync/models/nautobot.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@
)

try:
import nautobot_device_lifecycle_mgmt # noqa: F401
from nautobot_device_lifecycle_mgmt import SoftwareLCM # noqa: F401

LIFECYCLE_MGMT = True
except ImportError:
LIFECYCLE_MGMT = False

try:
from nautobot.dcim.models import SoftwareImageFile, SoftwareVersion # noqa: F401

SOFTWARE_VERSION_FOUND_IN_CORE = True
except ImportError:
SOFTWARE_VERSION_FOUND_IN_CORE = False


class NautobotArea(base.Area):
"""Nautobot implementation of Area DiffSync model."""
Expand All @@ -43,15 +50,16 @@ def create(cls, adapter, ids, attrs):
location_type=adapter.job.area_loctype,
status_id=adapter.status_map["Active"],
)
try:
parents_parent = "Global"
if ids["parent"] == "Global":
parents_parent = None
new_area.parent_id = adapter.region_map[parents_parent][ids["parent"]]
except KeyError:
adapter.job.logger.warning(
f"Unable to find {adapter.job.area_loctype.name} {ids['parent']} for {ids['name']}."
)
if ids.get("parent"):
try:
parents_parent = "Global"
if ids["parent"] == "Global":
parents_parent = None
new_area.parent_id = adapter.region_map[parents_parent][ids["parent"]]
except KeyError:
adapter.job.logger.warning(
f"Unable to find {adapter.job.area_loctype.name} {ids['parent']} for {ids['name']}."
)
new_area.validated_save()
if ids["parent"] not in adapter.region_map:
adapter.region_map[ids["parent"]] = {}
Expand Down Expand Up @@ -214,10 +222,20 @@ def create(cls, adapter, ids, attrs):
if attrs.get("tenant"):
new_device.tenant_id = adapter.tenant_map[attrs["tenant"]]
if attrs.get("version"):
new_device.custom_field_data.update({"os_version": attrs["version"]})
if LIFECYCLE_MGMT:
lcm_obj = add_software_lcm(adapter=adapter, platform=platform.network_driver, version=attrs["version"])
assign_version_to_device(adapter=adapter, device=new_device, software_lcm=lcm_obj)
if SOFTWARE_VERSION_FOUND_IN_CORE:
soft_version = SoftwareVersion.objects.get_or_create(
version=attrs["version"], platform=platform, defaults={"status_id": adapter.status_map["Active"]}
)[0]
image, _ = SoftwareImageFile.objects.get_or_create(
image_file_name=f"{platform.name}-{attrs['version']}-dnac-ssot-placeholder",
software_version=soft_version,
status_id=adapter.status_map["Active"],
)
image.device_types.add(device_type)
new_device.software_version = soft_version
new_device.custom_field_data.update({"system_of_record": "DNA Center"})
new_device.custom_field_data.update({"last_synced_from_sor": datetime.today().date().isoformat()})
adapter.objects_to_create["devices"].append(new_device)
Expand Down Expand Up @@ -260,13 +278,22 @@ def update(self, attrs):
if "controller_group" in attrs:
device.controller_managed_device_group = self.adapter.job.controller_group
if "version" in attrs:
device.custom_field_data.update({"os_version": attrs["version"]})
if LIFECYCLE_MGMT:
platform_network_driver = attrs["platform"] if attrs.get("platform") else self.platform
lcm_obj = add_software_lcm(
adapter=self.adapter, platform=platform_network_driver, version=attrs["version"]
)
assign_version_to_device(adapter=self.adapter, device=device, software_lcm=lcm_obj)
if SOFTWARE_VERSION_FOUND_IN_CORE:
if attrs.get("platform"):
platform = attrs["platform"]
else:
platform = self.platform
device.software_version = SoftwareVersion.objects.get_or_create(
version=attrs["version"],
platform__name=platform,
defaults={"status_id": self.adapter.status_map["Active"]},
)[0]
device.custom_field_data.update({"system_of_record": "DNA Center"})
device.custom_field_data.update({"last_synced_from_sor": datetime.today().date().isoformat()})
device.validated_save()
Expand Down Expand Up @@ -416,6 +443,8 @@ def create(cls, adapter, ids, attrs):
def update(self, attrs):
"""Update IPAddress in Nautobot from IPAddress object."""
ipaddr = IPAddress.objects.get(id=self.uuid)
if "mask_length" in attrs:
ipaddr.mask_length = attrs["mask_length"]
if "tenant" in attrs:
if attrs.get("tenant"):
ipaddr.tenant_id = self.adapter.tenant_map[attrs["tenant"]]
Expand Down
Loading

0 comments on commit d3f5a7a

Please sign in to comment.