From ef909790da4bd985cc4c239e4eaeb25d2acc4327 Mon Sep 17 00:00:00 2001 From: Garfield Lee Freeman Date: Thu, 22 Apr 2021 15:44:52 -0700 Subject: [PATCH] feat: Add device group hierarchy support (#321) * feat: Add device group hierarchy support This adds knowledge and handling of device group hierarchy to pan-os-python. Handling of this is moved to the `opstate` namespace since device group hierarchy is not stored in the XPATH of PAN-OS. Note that the "async" job to update a device group's parent seems to finish instantaneously. Most times in my testing the job has already finished by the time I could query the job status with pan-os-python. --- panos/base.py | 20 ++++++-- panos/panorama.py | 106 +++++++++++++++++++++++++++++++++++++++++ tests/test_panorama.py | 95 ++++++++++++++++++++++++++++++++++++ 3 files changed, 217 insertions(+), 4 deletions(-) create mode 100644 tests/test_panorama.py diff --git a/panos/base.py b/panos/base.py index ab7427af..3609fd46 100644 --- a/panos/base.py +++ b/panos/base.py @@ -115,6 +115,21 @@ def __init__(self, *args, **kwargs): # Store the value in the instance variable setattr(self, varname, varvalue) + self._setups() + + def _setups(self): + """The various setup functions that will be called on object creation.""" + funcs = [ + "_setup", + "_setup_opstate", + ] + + for func in funcs: + if hasattr(self, func): + f = getattr(self, func) + if callable(f): + f() + def __str__(self): return self.uid @@ -2276,10 +2291,7 @@ def __init__(self, *args, **kwargs): self._xpaths = ParentAwareXpath() self._stubs = VersionedStubs() - self._setup() - - if hasattr(self, "_setup_opstate") and callable(self._setup_opstate): - self._setup_opstate() + self._setups() try: params = super(VersionedPanObject, self).__getattribute__("_params") diff --git a/panos/panorama.py b/panos/panorama.py index a6ac54b1..330a2c0d 100644 --- a/panos/panorama.py +++ b/panos/panorama.py @@ -81,6 +81,9 @@ def _setup(self): self._params = tuple(params) + def _setup_opstate(self): + self.opstate = DeviceGroupOpState(self) + @property def vsys(self): return self.name @@ -91,6 +94,68 @@ def devicegroup(self): def xpath_vsys(self): return self.xpath() + def _setup_opstate(self): + self.opstate = DeviceGroupOpState(self) + + +class DeviceGroupOpState(object): + """Operational state handling for device group classes.""" + + def __init__(self, obj): + self.dg_hierarchy = DeviceGroupHierarchy(obj) + + +class DeviceGroupHierarchy(object): + """Operational state handling for device group hierarchy. + + Args: + parent (str): This device group's parent. + + """ + + def __init__(self, obj): + self.obj = obj + self.parent = None + + def refresh(self): + """Refresh the ``parent`` from the state.""" + + dev = self.obj.panorama() + state = dev.opstate.dg_hierarchy.fetch() + self.parent = state[self.obj.uid] + + def update(self): + """Change this device group's hierarchical parent. + + **Modifies the live device** + + This operation results in a job being submitted to the backend, which + this function will block until the move is completed. The return value of + this function is what is returned from + :meth:`panos.base.PanDevice.syncjob()`. + + Returns: + dict: Job result + + """ + dev = self.obj.panorama() + logger.debug( + '{0}: update hierarchical parent for "{1}": {2}'.format( + dev.id, self.obj.uid, self.parent + ) + ) + + e = ET.Element("request") + em = ET.SubElement(e, "move-dg") + eme = ET.SubElement(em, "entry", {"name": self.obj.name}) + + if self.parent is not None: + ET.SubElement(eme, "new-parent-dg").text = self.parent + + cmd = ET.tostring(e, encoding="utf-8") + resp = dev.op(cmd, cmd_xml=False) + return dev.syncjob(resp) + class Template(VersionedPanObject): """A panorama template. @@ -751,6 +816,47 @@ def get_vm_auth_keys(self): return ans + def _setup_opstate(self): + self.opstate = PanoramaOpState(self) + + +class PanoramaOpState(object): + """Panorama OP state handling.""" + + def __init__(self, obj): + self.dg_hierarchy = PanoramaDeviceGroupHierarchy(obj) + + +class PanoramaDeviceGroupHierarchy(object): + """Operational state handling for device group hierarchy.""" + + def __init__(self, obj): + self.obj = obj + self.parent = None + + def fetch(self): + """Returns a dict of device groups and their parents. + + Keys in the dict are the device group's name, while the value is the + name of that device group's parent. Top level device groups will have + a parent of ``None``. + + Returns: + dict + + """ + + resp = self.obj.op("show dg-hierarchy") + data = resp.find("./result/dg-hierarchy") + + ans = {} + nodes = [(None, x) for x in data.findall("./dg")] + for parent, elm in iter(nodes): + ans[elm.attrib["name"]] = parent + nodes.extend((elm.attrib["name"], x) for x in elm.findall("./dg")) + + return ans + class PanoramaCommit(object): """Normalization of a Panorama commit. diff --git a/tests/test_panorama.py b/tests/test_panorama.py new file mode 100644 index 00000000..18d2e179 --- /dev/null +++ b/tests/test_panorama.py @@ -0,0 +1,95 @@ +import xml.etree.ElementTree as ET + +try: + from unittest import mock +except ImportError: + import mock + +from panos.panorama import DeviceGroup +from panos.panorama import Panorama + + +def _device_group_hierarchy(): + pano = Panorama("127.0.0.1", "admin", "admin", "secret") + pano._version_info = (9999, 0, 0) + dg = DeviceGroup("drums") + pano.add(dg) + pano.op = mock.Mock( + return_value=ET.fromstring( + """ + + + + + + + + + + + + + + + + + + + + + +""", + ) + ) + + return dg + + +def test_panorama_dg_hierarchy_top_has_none_parent(): + dg = _device_group_hierarchy() + + ans = dg.parent.opstate.dg_hierarchy.fetch() + + for key in ("people", "solo group", "another solo group", "instruments", "parent"): + assert key in ans + assert ans[key] is None + + +def test_panorama_dg_hierarchy_first_level_child(): + dg = _device_group_hierarchy() + + ans = dg.parent.opstate.dg_hierarchy.fetch() + + fields = [ + ("people", "friends"), + ("instruments", "bass"), + ("instruments", "drums"), + ("instruments", "guitar"), + ("parent", "child"), + ] + + for parent, child in fields: + assert child in ans + assert ans[child] == parent + + +def test_panorama_dg_hierarchy_second_level_children(): + dg = _device_group_hierarchy() + + ans = dg.parent.opstate.dg_hierarchy.fetch() + + for field in ("jack", "jill"): + assert field in ans + assert ans[field] == "friends" + assert ans["friends"] == "people" + assert ans["people"] is None + + +def test_device_group_hierarchy_refresh(): + dg = _device_group_hierarchy() + + assert dg.opstate.dg_hierarchy.parent is None + + dg.opstate.dg_hierarchy.refresh() + + assert dg.opstate.dg_hierarchy.parent == "instruments"