Skip to content

Commit

Permalink
feat: Add hit count support (opstate) (#310)
Browse files Browse the repository at this point in the history
Fixes #239 - Adds `opstate.get_rule_hit_count()` to both the rulebases and
to all supported rule types.
  • Loading branch information
shinmog authored Mar 31, 2021
1 parent 3fa57e4 commit ca5ccc0
Show file tree
Hide file tree
Showing 3 changed files with 404 additions and 0 deletions.
3 changes: 3 additions & 0 deletions panos/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2278,6 +2278,9 @@ def __init__(self, *args, **kwargs):

self._setup()

if hasattr(self, "_setup_opstate") and callable(self._setup_opstate):
self._setup_opstate()

try:
params = super(VersionedPanObject, self).__getattribute__("_params")
except AttributeError:
Expand Down
161 changes: 161 additions & 0 deletions panos/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

"""Policies module contains policies and rules that exist in the 'Policies' tab in the firewall GUI"""

import xml.etree.ElementTree as ET

import panos.errors as err
from panos import getlogger
from panos.base import ENTRY, MEMBER, PanObject, Root
Expand Down Expand Up @@ -44,6 +46,9 @@ class Rulebase(VersionedPanObject):
def _setup(self):
self._xpaths.add_profile(value="/rulebase")

def _setup_opstate(self):
self.opstate = RulebaseOpState(self)


class PreRulebase(Rulebase):
"""Pre-rulebase for a Panorama
Expand Down Expand Up @@ -119,6 +124,7 @@ class SecurityRule(VersionedPanObject):
# TODO: Add QoS variables
SUFFIX = ENTRY
ROOT = Root.VSYS
HIT_COUNT_STYLE = "security"

def _setup(self):
# xpaths
Expand Down Expand Up @@ -218,6 +224,9 @@ def _setup(self):

self._params = tuple(params)

def _setup_opstate(self):
self.opstate = RuleOpState(self)


class NatRule(VersionedPanObject):
"""NAT Rule
Expand Down Expand Up @@ -283,6 +292,7 @@ class NatRule(VersionedPanObject):

SUFFIX = ENTRY
ROOT = Root.VSYS
HIT_COUNT_STYLE = "nat"

def _setup(self):
# xpaths
Expand Down Expand Up @@ -566,6 +576,9 @@ def _setup(self):

self._params = tuple(params)

def _setup_opstate(self):
self.opstate = RuleOpState(self)


class PolicyBasedForwarding(VersionedPanObject):
"""PBF rule.
Expand Down Expand Up @@ -613,6 +626,7 @@ class PolicyBasedForwarding(VersionedPanObject):

SUFFIX = ENTRY
ROOT = Root.VSYS
HIT_COUNT_STYLE = "pbf"

def _setup(self):
# xpaths
Expand Down Expand Up @@ -750,3 +764,150 @@ def _setup(self):
params[-1].add_profile("9.0.0", vartype="attrib", path="uuid")

self._params = tuple(params)

def _setup_opstate(self):
self.opstate = RuleOpState(self)


class RulebaseOpState(object):
"""Operational state handling for rulebase classes."""

def __init__(self, obj):
self.hit_count = RulebaseHitCount(obj)


class RulebaseHitCount(object):
"""Operational state handling for rulebase hit counts."""

def __init__(self, obj):
self.obj = obj

def refresh(self, style, rules=None, all_rules=False):
"""Retrieves hit count information for the specified rules.
PAN-OS 8.1+
Args:
style (str): The rule style to use. The style can be
"application-override", "authentication", "decryption", "dos",
"nat", "pbf", "qos", "sdwan", "security", or "tunnel-inspect".
rules (list): A list of rules. This can be a mix of `panos.policies`
instances or basic strings. If no rules are given, then the hit
count for all rules is retrieved.
all_rules (bool): If this is False, only retrieve hit count information
for the rules attached to the rulebase of the specified style. If
this is True, then get all rules. Either way, any rule whose hit count
is retrieved and is in the object hierarchy has the hit count data
saved to its `opstate`.
Returns:
dict: A dict where the key is the rule name and the value is the hit count information.
"""
dev = self.obj.nearest_pandevice()

if dev.retrieve_panos_version() < (8, 1, 0):
raise err.PanDeviceError("Rule hit count is supported in PAN-OS 8.1+")

kids = []
for x in self.obj.children:
if not hasattr(x, "HIT_COUNT_STYLE") or x.HIT_COUNT_STYLE != style:
continue
if rules is None or x.uid in rules:
kids.append(x)

cmd = ET.Element("show")
sub = ET.SubElement(cmd, "rule-hit-count")
sub = ET.SubElement(sub, "vsys")
sub = ET.SubElement(sub, "vsys-name")
sub = ET.SubElement(sub, "entry", {"name": dev.vsys or "vsys1"})
sub = ET.SubElement(sub, "rule-base")
sub = ET.SubElement(sub, "entry", {"name": style})
sub = ET.SubElement(sub, "rules")

if all_rules:
ET.SubElement(sub, "all")
else:
# Loop over rules specified or the object hierarchy.
rule_list = rules or kids or []
if not rule_list:
return {}
sub = ET.SubElement(sub, "list")
for x in rule_list:
if hasattr(x, "uid"):
ET.SubElement(sub, "member").text = x.uid
else:
ET.SubElement(sub, "member").text = x

res = dev.op(ET.tostring(cmd, encoding="utf-8"), cmd_xml=False)

ans = {}
for elm in res.findall(
"./result/rule-hit-count/vsys/entry/rule-base/entry/rules/entry"
):
name = elm.attrib["name"]
for x in kids:
if x.uid == name:
x.opstate.hit_count.refresh(elm)
ans[name] = x.opstate.hit_count
break
else:
ans[name] = HitCount(name=name, elm=elm)

return ans


class HitCount(object):
"""Hit count operational data."""

FIELDS = (
("latest", "latest", "str"),
("hit_count", "hit-count", "int"),
("last_hit_timestamp", "last-hit-timestamp", "int"),
("last_reset_timestamp", "last-reset-timestamp", "int"),
("first_hit_timestamp", "first-hit-timestamp", "int"),
("rule_creation_timestamp", "rule-creation-timestamp", "int"),
("rule_modification_timestamp", "rule-modification-timestamp", "int"),
)

def __init__(self, obj=None, name=None, elm=None):
self.obj = obj
self.name = name if obj is None else obj.uid
self._refresh_xml(elm)

def refresh(self, elm=None):
if elm is None and self.obj is not None:
self.obj.parent.opstate.hit_count.refresh(
self.obj.HIT_COUNT_STYLE, [self.name,],
)
else:
self._refresh_xml(elm)

def _refresh_xml(self, elm):
for param, path, param_type in self.FIELDS:
if param_type == "int":
setattr(self, param, self._int(elm, path))
else:
setattr(self, param, self._str(elm, path))

def _str(self, elm, field):
if elm is None:
return

val = elm.find("./{0}".format(field))
if val is not None:
return val.text

def _int(self, elm, field):
val = self._str(elm, field)

if val is not None:
return int(val)


class RuleOpState(object):
"""Operational state handling for a rule in the rulebase."""

def __init__(self, obj):
self.obj = obj
self.hit_count = HitCount(obj)
Loading

0 comments on commit ca5ccc0

Please sign in to comment.