Skip to content

Commit

Permalink
feat: Add device group hierarchy support (#321)
Browse files Browse the repository at this point in the history
* feat: Add device group hierarchy support

This adds knowledge and handling of device group hierarchy to
pan-os-python.

Handling of this is moved to the `opstate` namespace since device
group hierarchy is not stored in the XPATH of PAN-OS.

Note that the "async" job to update a device group's parent seems
to finish instantaneously.  Most times in my testing the job has
already finished by the time I could query the job status with
pan-os-python.
  • Loading branch information
shinmog committed May 6, 2021
1 parent 1b7f1aa commit ef90979
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 4 deletions.
20 changes: 16 additions & 4 deletions panos/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,21 @@ def __init__(self, *args, **kwargs):
# Store the value in the instance variable
setattr(self, varname, varvalue)

self._setups()

def _setups(self):
"""The various setup functions that will be called on object creation."""
funcs = [
"_setup",
"_setup_opstate",
]

for func in funcs:
if hasattr(self, func):
f = getattr(self, func)
if callable(f):
f()

def __str__(self):
return self.uid

Expand Down Expand Up @@ -2276,10 +2291,7 @@ def __init__(self, *args, **kwargs):
self._xpaths = ParentAwareXpath()
self._stubs = VersionedStubs()

self._setup()

if hasattr(self, "_setup_opstate") and callable(self._setup_opstate):
self._setup_opstate()
self._setups()

try:
params = super(VersionedPanObject, self).__getattribute__("_params")
Expand Down
106 changes: 106 additions & 0 deletions panos/panorama.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ def _setup(self):

self._params = tuple(params)

def _setup_opstate(self):
self.opstate = DeviceGroupOpState(self)

@property
def vsys(self):
return self.name
Expand All @@ -91,6 +94,68 @@ def devicegroup(self):
def xpath_vsys(self):
return self.xpath()

def _setup_opstate(self):
self.opstate = DeviceGroupOpState(self)


class DeviceGroupOpState(object):
"""Operational state handling for device group classes."""

def __init__(self, obj):
self.dg_hierarchy = DeviceGroupHierarchy(obj)


class DeviceGroupHierarchy(object):
"""Operational state handling for device group hierarchy.
Args:
parent (str): This device group's parent.
"""

def __init__(self, obj):
self.obj = obj
self.parent = None

def refresh(self):
"""Refresh the ``parent`` from the state."""

dev = self.obj.panorama()
state = dev.opstate.dg_hierarchy.fetch()
self.parent = state[self.obj.uid]

def update(self):
"""Change this device group's hierarchical parent.
**Modifies the live device**
This operation results in a job being submitted to the backend, which
this function will block until the move is completed. The return value of
this function is what is returned from
:meth:`panos.base.PanDevice.syncjob()`.
Returns:
dict: Job result
"""
dev = self.obj.panorama()
logger.debug(
'{0}: update hierarchical parent for "{1}": {2}'.format(
dev.id, self.obj.uid, self.parent
)
)

e = ET.Element("request")
em = ET.SubElement(e, "move-dg")
eme = ET.SubElement(em, "entry", {"name": self.obj.name})

if self.parent is not None:
ET.SubElement(eme, "new-parent-dg").text = self.parent

cmd = ET.tostring(e, encoding="utf-8")
resp = dev.op(cmd, cmd_xml=False)
return dev.syncjob(resp)


class Template(VersionedPanObject):
"""A panorama template.
Expand Down Expand Up @@ -751,6 +816,47 @@ def get_vm_auth_keys(self):

return ans

def _setup_opstate(self):
self.opstate = PanoramaOpState(self)


class PanoramaOpState(object):
"""Panorama OP state handling."""

def __init__(self, obj):
self.dg_hierarchy = PanoramaDeviceGroupHierarchy(obj)


class PanoramaDeviceGroupHierarchy(object):
"""Operational state handling for device group hierarchy."""

def __init__(self, obj):
self.obj = obj
self.parent = None

def fetch(self):
"""Returns a dict of device groups and their parents.
Keys in the dict are the device group's name, while the value is the
name of that device group's parent. Top level device groups will have
a parent of ``None``.
Returns:
dict
"""

resp = self.obj.op("show dg-hierarchy")
data = resp.find("./result/dg-hierarchy")

ans = {}
nodes = [(None, x) for x in data.findall("./dg")]
for parent, elm in iter(nodes):
ans[elm.attrib["name"]] = parent
nodes.extend((elm.attrib["name"], x) for x in elm.findall("./dg"))

return ans


class PanoramaCommit(object):
"""Normalization of a Panorama commit.
Expand Down
95 changes: 95 additions & 0 deletions tests/test_panorama.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import xml.etree.ElementTree as ET

try:
from unittest import mock
except ImportError:
import mock

from panos.panorama import DeviceGroup
from panos.panorama import Panorama


def _device_group_hierarchy():
pano = Panorama("127.0.0.1", "admin", "admin", "secret")
pano._version_info = (9999, 0, 0)
dg = DeviceGroup("drums")
pano.add(dg)
pano.op = mock.Mock(
return_value=ET.fromstring(
"""
<response code="19" status="success">
<result>
<dg-hierarchy>
<dg dg_id="55" name="people">
<dg dg_id="54" name="friends">
<dg dg_id="57" name="jack" />
<dg dg_id="58" name="jill" />
</dg>
</dg>
<dg dg_id="11" name="solo group" />
<dg dg_id="44" name="another solo group" />
<dg dg_id="69" name="instruments">
<dg dg_id="71" name="bass" />
<dg dg_id="72" name="drums" />
<dg dg_id="73" name="guitar" />
</dg>
<dg dg_id="100" name="parent">
<dg dg_id="101" name="child" />
</dg>
</dg-hierarchy>
</result>
</response>""",
)
)

return dg


def test_panorama_dg_hierarchy_top_has_none_parent():
dg = _device_group_hierarchy()

ans = dg.parent.opstate.dg_hierarchy.fetch()

for key in ("people", "solo group", "another solo group", "instruments", "parent"):
assert key in ans
assert ans[key] is None


def test_panorama_dg_hierarchy_first_level_child():
dg = _device_group_hierarchy()

ans = dg.parent.opstate.dg_hierarchy.fetch()

fields = [
("people", "friends"),
("instruments", "bass"),
("instruments", "drums"),
("instruments", "guitar"),
("parent", "child"),
]

for parent, child in fields:
assert child in ans
assert ans[child] == parent


def test_panorama_dg_hierarchy_second_level_children():
dg = _device_group_hierarchy()

ans = dg.parent.opstate.dg_hierarchy.fetch()

for field in ("jack", "jill"):
assert field in ans
assert ans[field] == "friends"
assert ans["friends"] == "people"
assert ans["people"] is None


def test_device_group_hierarchy_refresh():
dg = _device_group_hierarchy()

assert dg.opstate.dg_hierarchy.parent is None

dg.opstate.dg_hierarchy.refresh()

assert dg.opstate.dg_hierarchy.parent == "instruments"

0 comments on commit ef90979

Please sign in to comment.