From 40940edfd43daae2c5bb59f53fd5b15bd2b400a0 Mon Sep 17 00:00:00 2001 From: bekelly Date: Mon, 4 Nov 2019 12:32:23 +0800 Subject: [PATCH 1/5] Initial commit of prefilter support --- fmcapi/api_objects/__init__.py | 2 + .../policy_services/prefilterpolicies.py | 33 ++- .../policy_services/prefilterrules.py | 192 +++++++++++++++++- 3 files changed, 214 insertions(+), 13 deletions(-) diff --git a/fmcapi/api_objects/__init__.py b/fmcapi/api_objects/__init__.py index 3180d52..159b93a 100644 --- a/fmcapi/api_objects/__init__.py +++ b/fmcapi/api_objects/__init__.py @@ -139,6 +139,7 @@ from .policy_services.natrules import NatRules from .policy_services.prefilterpolicies import PreFilterPolicies from .policy_services.prefilterpolicies import PreFilterPolicy +from .policy_services.prefilterrules import PreFilterRules from .policy_assignment_services.policyassignments import PolicyAssignments @@ -280,6 +281,7 @@ "IntrusionPolicy", "PreFilterPolicies", "PreFilterPolicy", + "PreFilterRules", "HitCounts", "HitCount", "TaskStatuses", diff --git a/fmcapi/api_objects/policy_services/prefilterpolicies.py b/fmcapi/api_objects/policy_services/prefilterpolicies.py index 2568e0a..a4a75dc 100644 --- a/fmcapi/api_objects/policy_services/prefilterpolicies.py +++ b/fmcapi/api_objects/policy_services/prefilterpolicies.py @@ -8,29 +8,44 @@ class PreFilterPolicies(APIClassTemplate): The PreFilterPolicies Object in the FMC. """ - VALID_JSON_DATA = ["id", "name"] + VALID_JSON_DATA = ["id", "name", "type", "description", "defaultAction"] VALID_FOR_KWARGS = VALID_JSON_DATA + [] URL_SUFFIX = "/policy/prefilterpolicies" VALID_CHARACTERS_FOR_NAME = """[.\w\d_\- ]""" + REQUIRED_FOR_POST = ["name"] + DEFAULT_ACTION_OPTIONS = ["ANALYZE_TUNNELS", "BOCK_TUNNELS"] def __init__(self, fmc, **kwargs): super().__init__(fmc, **kwargs) logging.debug("In __init__() for PreFilterPolicies class.") self.parse_kwargs(**kwargs) self.type = "PreFilterPolicy" - - def post(self): - logging.info("POST method for API for PreFilterPolicies not supported.") - pass + self._defaultAction = None + self.defaultAction = "ANALYZE_TUNNELS" + + @property + def defaultAction(self): + return {"type": "PrefilterPolicyDefaultAction", "action": self._defaultAction} + + @defaultAction.setter + def defaultAction(self, action): + if action in self.DEFAULT_ACTION_OPTIONS: + self._defaultAction = action + else: + logging.error( + f"action, {action}, is not a valid option. Choose from {self.DEFAULT_ACTION_OPTIONS}." + ) + + def format_data(self): + json_data = super().format_data() + logging.debug("In format_data() for AccessPolicies class.") + json_data["defaultAction"] = self.defaultAction + return json_data def put(self): logging.info("PUT method for API for PreFilterPolicies not supported.") pass - def delete(self): - logging.info("DELETE method for API for PreFilterPolicies not supported.") - pass - class PreFilterPolicy(PreFilterPolicies): """Dispose of this Class after 20210101.""" diff --git a/fmcapi/api_objects/policy_services/prefilterrules.py b/fmcapi/api_objects/policy_services/prefilterrules.py index f63be69..b1d9d35 100644 --- a/fmcapi/api_objects/policy_services/prefilterrules.py +++ b/fmcapi/api_objects/policy_services/prefilterrules.py @@ -1,4 +1,188 @@ -""" -Not yet implemented -added FMC v6.5.0 -""" +from fmcapi.api_objects.apiclasstemplate import APIClassTemplate +from fmcapi.api_objects.policy_services.prefilterpolicies import PreFilterPolicies +from fmcapi.api_objects.object_services.securityzones import SecurityZones +import logging + + +class PreFilterRules(APIClassTemplate): + """ + The PreFilterRules object in the FMC + """ + VALID_JSON_DATA = [ + "id", + "name", + "action" + ] + PREFIX_URL = '/policy/prefilterpolicies' + VALID_FOR_ACTION = ['FASTPATH', 'ANALYZE', 'BLOCK'] + REQUIRED_FOR_POST = ['prefilter_id'] + REQUIRED_FOR_GET = ['prefilter_id'] + + def __init__(self, fmc, **kwargs): + super().__init__(fmc, **kwargs) + logging.debug("In __init__() for PreFilterRules class.") + self.type = "PreFilterRules" + self.prefilter_id = None + self.prefilter_added_to_url = False + self.action = 'FASTPATH' + self.parse_kwargs(**kwargs) + self.URL = f"{self.URL}{self.URL_SUFFIX}" + + @property + def URL_SUFFIX(self): + """ + Add the URL suffixes for insertBefore and insertAfter + NOTE: You must specify these at the time of the object is initialized + for this to work correctly. Example: + This works: + new_rule = PreFilterRules(fmc=fmc, prefilter_name='pre1', insertBefore=2) + + This does not: + new_rule = PreFilterRules(fmc=fmc, prefilter_name='pre1') + new_rule.insertBefore = 2 + """ + url = "?" + if "insertBefore" in self.__dict__: + url = f"{url}insertBefore={self.insertBefore}" + elif "insertAfter" in self.__dict__: + url = f"{url}insertAfter={self.insertAfter}" + + return url + + def parse_kwargs(self, **kwargs): + """ + Parse the kwargs and load into object properties + Args: + kwargs (dict): Keyword arguments + """ + super().parse_kwargs(**kwargs) + logging.debug("In parse_kwargs() for PreFilterRules class") + + if "prefilter_id" in kwargs: + self.prefilter(prefilter_id=kwargs["prefilter_id"]) + if 'prefilter_name' in kwargs: + self.prefilter(name=kwargs['prefilter_name']) + if 'action' in kwargs: + self.validate_action(kwargs['action']) + + def prefilter(self, name=None, prefilter_id=None): + """ + If the name has been supplied, go and pull the prefilter ID. If ID has been specified, add it to self. + Also create the URL for prefilter policy + Args: + name (str): Prefilter name + prefilter_id (str): UUID of prefilter + """ + logging.debug("In prefilter() for PreFilterRules") + if not name and not prefilter_id: + logging.warning(f'Unable to find prefilter as no name or ID was specified') + + elif name: + logging.info(f'Searching ID for prefilter "{name}"') + pre1 = PreFilterPolicies(fmc=self.fmc) + pre1.get(name=name) + + if 'id' in pre1.__dict__: + prefilter_id = pre1.id + else: + logging.warning(f'Prefilter policy {name} not found. Cannot setup perfilter policy for PreFilterRules') + + self.prefilter_id = prefilter_id + self.URL = f'{self.fmc.configuration_url}{self.PREFIX_URL}/{self.prefilter_id}/prefilterrules' + self.prefilter_added_to_url = True + + def validate_action(self, action): + """ + Checks the provided action is valid and sets property + Args: + action (str): Prefilter rule action + """ + if action in self.VALID_FOR_ACTION: + self.action = action + else: + logging.warning(f'Action {action} is not a valid option\nValid actions are: {self.VALID_FOR_ACTION}') + + def source_zone(self, action, name=''): + """ + Set the source zone information + Args: + action (str): "add" or "remove" + name (str): Name of interface zone object + """ + logging.debug('In source_zone() for PreFilterRules class.') + if action == 'clear': + if 'sourceZones' in self.__dict__: + del self.sourceZones + logging.info(f'Removed source zone from prefilter rule') + + elif action == 'add': + src_zone_id, src_zone_type = self.get_zone_id(name) + if src_zone_id: + self.add_zone('sourceZones', name, src_zone_id, src_zone_type) + else: + logging.warning(f'Security zone object "{name}" not found on FMC') + + def destination_zone(self, action, name=''): + """ + Set the destination zone information + Args: + action (str): "add" or "remove" + name (str): Name of interface zone object + """ + logging.debug('In source_zone() for PreFilterRules class.') + if action == 'clear': + if 'sourceZones' in self.__dict__: + del self.destinationZones + logging.info(f'Removed source zone from prefilter rule') + + elif action == 'add': + dst_zone_id, dst_zone_type = self.get_zone_id(name) + if dst_zone_id: + self.add_zone('destinationZones', name, dst_zone_id, dst_zone_type) + else: + logging.warning(f'Security zone object "{name}" not found on FMC') + + def get_zone_id(self, name): + """ + Pull the ID for a security zone + Args: + name (str): Name of interface zone object + + Returns: + UUID of zone object (str) + """ + sec_zone = SecurityZones(fmc=self.fmc) + sec_zone.get(name=name) + if 'id' in sec_zone.__dict__: + return sec_zone.id, sec_zone.type + else: + return None, None + + def add_zone(self, target, name, id, zone_type): + """ + Check if zone is already on object, skip if it is, add if it isn't and create sourceZone if that object + attribute doesn't exist at all + Args: + target (str): "sourceZones" or "destinationZones" + name (str): Name of zone object + id (str): UUID of zone object + zone_type (str): Security zone type + """ + + if target in self.__dict__: + + # Look through existing zones, if present, for the zone name. Skip if it's already there + zone_name_duplicate = False + zone_list = [] + for zone in getattr(self, target)['objects']: + zone_list.append(zone) + if zone['name'] == name: + zone_name_duplicate = True + + if not zone_name_duplicate: + zone_list.append({'name': name, 'id': id, 'type': zone_type}) + setattr(self, target, {'objects': zone_list}) + + # Set the zone if it doesn't exist + else: + setattr(self, target, {'objects': [{'name': name, 'id': id, 'type': zone_type}]}) \ No newline at end of file From e665d91fcfd530081a14fe1d467aef0ca65ae666 Mon Sep 17 00:00:00 2001 From: bekelly Date: Mon, 25 Nov 2019 11:34:52 +0800 Subject: [PATCH 2/5] Added support for - source/destination networks (literals and objects) - source/destination interfaces --- .../policy_services/prefilterpolicies.py | 1 + .../policy_services/prefilterrules.py | 255 +++++++++++++++--- unit_tests/__init__.py | 4 +- unit_tests/prefilter.py | 23 ++ unit_tests/prefilter_rule.py | 68 +++++ 5 files changed, 311 insertions(+), 40 deletions(-) create mode 100644 unit_tests/prefilter.py create mode 100644 unit_tests/prefilter_rule.py diff --git a/fmcapi/api_objects/policy_services/prefilterpolicies.py b/fmcapi/api_objects/policy_services/prefilterpolicies.py index a4a75dc..8c86499 100644 --- a/fmcapi/api_objects/policy_services/prefilterpolicies.py +++ b/fmcapi/api_objects/policy_services/prefilterpolicies.py @@ -14,6 +14,7 @@ class PreFilterPolicies(APIClassTemplate): VALID_CHARACTERS_FOR_NAME = """[.\w\d_\- ]""" REQUIRED_FOR_POST = ["name"] DEFAULT_ACTION_OPTIONS = ["ANALYZE_TUNNELS", "BOCK_TUNNELS"] + FIRST_SUPPORTED_FMC_VERSION = "6.5" def __init__(self, fmc, **kwargs): super().__init__(fmc, **kwargs) diff --git a/fmcapi/api_objects/policy_services/prefilterrules.py b/fmcapi/api_objects/policy_services/prefilterrules.py index b1d9d35..73892d3 100644 --- a/fmcapi/api_objects/policy_services/prefilterrules.py +++ b/fmcapi/api_objects/policy_services/prefilterrules.py @@ -1,6 +1,10 @@ from fmcapi.api_objects.apiclasstemplate import APIClassTemplate from fmcapi.api_objects.policy_services.prefilterpolicies import PreFilterPolicies from fmcapi.api_objects.object_services.securityzones import SecurityZones +from fmcapi.api_objects.helper_functions import get_networkaddress_type +from fmcapi.api_objects.object_services.fqdns import FQDNS +from fmcapi.api_objects.object_services.networkgroups import NetworkGroups +from fmcapi.api_objects.object_services.networkaddresses import NetworkAddresses import logging @@ -8,15 +12,24 @@ class PreFilterRules(APIClassTemplate): """ The PreFilterRules object in the FMC """ + VALID_JSON_DATA = [ "id", "name", - "action" + "action", + "sourceNetworks", + "destinationNetworks", + "sourceInterfaces", + "destinationInterfaces", + "ruleType", + "type", + "enabled", ] - PREFIX_URL = '/policy/prefilterpolicies' - VALID_FOR_ACTION = ['FASTPATH', 'ANALYZE', 'BLOCK'] - REQUIRED_FOR_POST = ['prefilter_id'] - REQUIRED_FOR_GET = ['prefilter_id'] + PREFIX_URL = "/policy/prefilterpolicies" + VALID_FOR_ACTION = ["FASTPATH", "ANALYZE", "BLOCK"] + VALID_FOR_RULETYPE = ["TUNNEL", "PREFILTER"] + REQUIRED_FOR_POST = ["prefilter_id"] + REQUIRED_FOR_GET = ["prefilter_id"] def __init__(self, fmc, **kwargs): super().__init__(fmc, **kwargs) @@ -24,9 +37,12 @@ def __init__(self, fmc, **kwargs): self.type = "PreFilterRules" self.prefilter_id = None self.prefilter_added_to_url = False - self.action = 'FASTPATH' + self.action = "FASTPATH" self.parse_kwargs(**kwargs) self.URL = f"{self.URL}{self.URL_SUFFIX}" + self.ruleType = "PREFILTER" + self.type = "PrefilterRule" + self.enabled = False @property def URL_SUFFIX(self): @@ -60,10 +76,10 @@ def parse_kwargs(self, **kwargs): if "prefilter_id" in kwargs: self.prefilter(prefilter_id=kwargs["prefilter_id"]) - if 'prefilter_name' in kwargs: - self.prefilter(name=kwargs['prefilter_name']) - if 'action' in kwargs: - self.validate_action(kwargs['action']) + if "prefilter_name" in kwargs: + self.prefilter(name=kwargs["prefilter_name"]) + if "action" in kwargs: + self.validate_action(kwargs["action"]) def prefilter(self, name=None, prefilter_id=None): """ @@ -75,20 +91,22 @@ def prefilter(self, name=None, prefilter_id=None): """ logging.debug("In prefilter() for PreFilterRules") if not name and not prefilter_id: - logging.warning(f'Unable to find prefilter as no name or ID was specified') + logging.warning(f"Unable to find prefilter as no name or ID was specified") elif name: logging.info(f'Searching ID for prefilter "{name}"') pre1 = PreFilterPolicies(fmc=self.fmc) pre1.get(name=name) - if 'id' in pre1.__dict__: + if "id" in pre1.__dict__: prefilter_id = pre1.id else: - logging.warning(f'Prefilter policy {name} not found. Cannot setup perfilter policy for PreFilterRules') + logging.warning( + f"Prefilter policy {name} not found. Cannot setup perfilter policy for PreFilterRules" + ) self.prefilter_id = prefilter_id - self.URL = f'{self.fmc.configuration_url}{self.PREFIX_URL}/{self.prefilter_id}/prefilterrules' + self.URL = f"{self.fmc.configuration_url}{self.PREFIX_URL}/{self.prefilter_id}/prefilterrules" self.prefilter_added_to_url = True def validate_action(self, action): @@ -100,45 +118,47 @@ def validate_action(self, action): if action in self.VALID_FOR_ACTION: self.action = action else: - logging.warning(f'Action {action} is not a valid option\nValid actions are: {self.VALID_FOR_ACTION}') + logging.warning( + f"Action {action} is not a valid option\nValid actions are: {self.VALID_FOR_ACTION}" + ) - def source_zone(self, action, name=''): + def source_interface(self, action, name=""): """ - Set the source zone information + Set the source interface (zone) information Args: action (str): "add" or "remove" name (str): Name of interface zone object """ - logging.debug('In source_zone() for PreFilterRules class.') - if action == 'clear': - if 'sourceZones' in self.__dict__: - del self.sourceZones - logging.info(f'Removed source zone from prefilter rule') + logging.debug("In source_zone() for PreFilterRules class.") + if action == "clear": + if "sourceInterfaces" in self.__dict__: + del self.sourceInterfaces + logging.info(f"Removed source zone from prefilter rule") - elif action == 'add': + elif action == "add": src_zone_id, src_zone_type = self.get_zone_id(name) if src_zone_id: - self.add_zone('sourceZones', name, src_zone_id, src_zone_type) + self.add_zone("sourceInterfaces", name, src_zone_id, src_zone_type) else: logging.warning(f'Security zone object "{name}" not found on FMC') - def destination_zone(self, action, name=''): + def destination_interface(self, action, name=""): """ - Set the destination zone information + Set the destination interface (zone) information Args: action (str): "add" or "remove" name (str): Name of interface zone object """ - logging.debug('In source_zone() for PreFilterRules class.') - if action == 'clear': - if 'sourceZones' in self.__dict__: - del self.destinationZones - logging.info(f'Removed source zone from prefilter rule') + logging.debug("In source_zone() for PreFilterRules class.") + if action == "clear": + if "sourceZones" in self.__dict__: + del self.destinationInterfaces + logging.info(f"Removed source zone from prefilter rule") - elif action == 'add': + elif action == "add": dst_zone_id, dst_zone_type = self.get_zone_id(name) if dst_zone_id: - self.add_zone('destinationZones', name, dst_zone_id, dst_zone_type) + self.add_zone("destinationInterfaces", name, dst_zone_id, dst_zone_type) else: logging.warning(f'Security zone object "{name}" not found on FMC') @@ -153,7 +173,7 @@ def get_zone_id(self, name): """ sec_zone = SecurityZones(fmc=self.fmc) sec_zone.get(name=name) - if 'id' in sec_zone.__dict__: + if "id" in sec_zone.__dict__: return sec_zone.id, sec_zone.type else: return None, None @@ -174,15 +194,172 @@ def add_zone(self, target, name, id, zone_type): # Look through existing zones, if present, for the zone name. Skip if it's already there zone_name_duplicate = False zone_list = [] - for zone in getattr(self, target)['objects']: + for zone in getattr(self, target)["objects"]: zone_list.append(zone) - if zone['name'] == name: + if zone["name"] == name: zone_name_duplicate = True if not zone_name_duplicate: - zone_list.append({'name': name, 'id': id, 'type': zone_type}) - setattr(self, target, {'objects': zone_list}) + zone_list.append({"name": name, "id": id, "type": zone_type}) + setattr(self, target, {"objects": zone_list}) # Set the zone if it doesn't exist else: - setattr(self, target, {'objects': [{'name': name, 'id': id, 'type': zone_type}]}) \ No newline at end of file + setattr( + self, target, {"objects": [{"name": name, "id": id, "type": zone_type}]} + ) + + def source_network(self, action, name=None, literal=None): + """ + Adds either an object (name) or a literal to sourceNetworks + Args: + action (str): Action to be done on this object + name (str): Object name + literal (str): Host, network or range + + """ + logging.debug("In source_network() for PreFilterRules class.") + if literal and name: + raise ValueError( + "Adding source literal and object at the same time not supported" + ) + + if not hasattr(self, "sourceNetworks"): + self.sourceNetworks = {"objects": [], "literals": []} + + if action == "add" and literal: + literal_type = get_networkaddress_type(literal) + self.sourceNetworks["literals"].append( + {"type": literal_type, "value": literal} + ) + + elif action == "add" and name: + new_object = self.find_object(name) + + # Check if object is already in the list and if not, then add it + if new_object and new_object not in self.sourceNetworks["objects"]: + logging.info(f'Adding "{name}" to sourceNetworks for prefilter rule') + self.sourceNetworks["objects"].append(new_object) + + def destination_network(self, action, name=None, literal=None): + """ + Adds either an object (name) or a literal to destinationNetworks + Args: + action (str): Action to be done on this object + name (str): Object name + literal (str): Host, network or range + + """ + logging.debug("In destination_network() for PreFilterRules class.") + if literal and name: + raise ValueError( + "Adding source literal and object at the same time not supported" + ) + + if not hasattr(self, "destinationNetworks"): + self.destinationNetworks = {"objects": [], "literals": []} + + if action == "add" and literal: + literal_type = get_networkaddress_type(literal) + self.destinationNetworks["literals"].append( + {"type": literal_type, "value": literal} + ) + + elif action == "add" and name: + new_object = self.find_object(name) + + # Check if object is already in the list and if not, then add it + if new_object and new_object not in self.destinationNetworks["objects"]: + logging.info( + f'Adding "{name}" to destinationNetworks for prefilter rule' + ) + self.destinationNetworks["objects"].append(new_object) + + def find_object(self, name): + """ + Search through IP net objects, network group objects and FQDN objects by name + Args: + name (str): Name of object + Returns: + Object details (dict) or None + """ + object_id, object_type = self.find_network_object(name) + if object_id: + return {"name": name, "id": object_id, "type": object_type} + + object_id, object_type = self.find_group_object(name) + if object_id: + return {"name": name, "id": object_id, "type": object_type} + + object_id, object_type = self.find_fqdn_object(name) + if object_id: + return {"name": name, "id": object_id, "type": object_type} + + return None + + def find_network_object(self, name): + """ + Search for network object by name + Args: + name (str): Name of network object + Returns: + id (str), type (str) or None None + """ + network_object = NetworkAddresses(fmc=self.fmc, name=name) + resp = network_object.get() + + return self._return_id_type(resp) + + def find_group_object(self, name): + """ + Search for group object by name + Args: + name (str): Name of group object + Returns: + id (str), type (str) or None None + """ + group_object = NetworkGroups(fmc=self.fmc, name=name) + resp = group_object.get() + + return self._return_id_type(resp) + + def find_fqdn_object(self, name): + """ + Search for group object by name + Args: + name (str): Name of group object + Returns: + id (str), type (str) or None None + """ + fqdn_object = FQDNS(fmc=self.fmc, name=name) + resp = fqdn_object.get() + + return self._return_id_type(resp) + + @staticmethod + def _return_id_type(resp): + """ + Check dict for ID and return ID and type if present + Args: + resp (dict): Response from NetworkGroup, NetworkAddress or FQDNS GET + Returns: + Returns: + id (str), type (str) or None None + """ + if "id" in resp.keys(): + return resp["id"], resp["type"] + else: + return None, None + + def rule_type(self, rule_type): + """ + Set the rule type attribute + Args: + rule_type (str): PREFITLER or TUNNEL + """ + if rule_type in self.VALID_FOR_RULETYPE: + self.ruleType = rule_type + else: + logging.warning( + f'Rule type "{rule_type}" is not valid. Must be "PREFILTER" or "TUNNEL"' + ) diff --git a/unit_tests/__init__.py b/unit_tests/__init__.py index 8f155f1..5e4863f 100644 --- a/unit_tests/__init__.py +++ b/unit_tests/__init__.py @@ -60,7 +60,8 @@ from .staticroutes import test__staticroutes from .ipv4staticroutes import test__ipv4staticroutes from .ipv6staticroutes import test__ipv6staticroutes - +from .prefilter import test__prefilter_policy +from .prefilter_rule import test__prefiler_rule from .s2s_vpn import test__ftds2svpns logging.debug("In the unit-tests __init__.py file.") @@ -126,4 +127,5 @@ "test__acp_rule", "test__access_control_policy", "test__intrusion_policy", + "test__prefilter_policy", ] diff --git a/unit_tests/prefilter.py b/unit_tests/prefilter.py new file mode 100644 index 0000000..dc241af --- /dev/null +++ b/unit_tests/prefilter.py @@ -0,0 +1,23 @@ +import logging +import fmcapi +import time + + +def test__prefilter_policy(fmc): + logging.info("Test PreFilterPolicies. Post, get, put, delete ACP Objects.") + + starttime = str(int(time.time())) + namer = f"_fmcapi_test_{starttime}" + + obj1 = fmcapi.PreFilterPolicies(fmc=fmc) + obj1.name = namer + obj1.post() + time.sleep(1) + del obj1 + obj1 = fmcapi.PreFilterPolicies(fmc=fmc, name=namer) + obj1.get() + obj1.name = "asdfasdf" + obj1.put() + time.sleep(1) + obj1.delete() + logging.info("Test PreFilterPolicies done.\n") diff --git a/unit_tests/prefilter_rule.py b/unit_tests/prefilter_rule.py new file mode 100644 index 0000000..d659537 --- /dev/null +++ b/unit_tests/prefilter_rule.py @@ -0,0 +1,68 @@ +import logging +import time +from fmcapi import ( + Hosts, + Networks, + Ranges, + FQDNS, + NetworkGroups, + PreFilterPolicies, + PreFilterRules, + SecurityZones, +) + + +def test__prefiler_rule(fmc): + logging.info("Testing prefilter rules on FMC") + logging.info("Creating objects for testing on FMC") + + namer = f"fmcapi_test_{str(int(time.time()))}" + ip_host_1 = Hosts(fmc=fmc, name=f"test_host_1_{namer}", value="7.7.7.7") + ip_host_1.post() + ip_net_1 = Networks(fmc=fmc, name=f"test_net_1_{namer}", value="10.0.0.0/8") + ip_net_1.post() + ip_range_1 = Ranges( + fmc=fmc, name=f"test_range_1_{namer}", value="10.1.1.1-10.1.1.10" + ) + ip_range_1.post() + fqdn_1 = FQDNS(fmc=fmc, name=f"test_fqdn_1_{namer}", value="www.cisco.com") + fqdn_1.post() + net_group_1 = NetworkGroups(fmc=fmc, name=f"net_group_1_{namer}") + net_group_1.named_networks(action="add", name=f"test_net_1_{namer}") + net_group_1.post() + sec_zone_1 = SecurityZones( + fmc=fmc, name=f"test_zone_1_{namer}", interfaceMode="ROUTED" + ) + sec_zone_1.post() + sec_zone_2 = SecurityZones( + fmc=fmc, name=f"test_zone_2_{namer}", interfaceMode="ROUTED" + ) + sec_zone_2.post() + + logging.info(f'Creating test prefiler "{namer}"') + prefilter = PreFilterPolicies(fmc=fmc, name=namer) + prefilter.post() + time.sleep(1) + + logging.info(f'Creating test prefiler rule "test_1"') + prefilter_rule_1 = PreFilterRules(fmc=fmc, prefilter_name=namer) + prefilter_rule_1.name = "test_1" + prefilter_rule_1.enabled = True + prefilter_rule_1.source_interface(action="add", name=f"test_zone_1_{namer}") + prefilter_rule_1.destination_interface(action="add", name=f"test_zone_2_{namer}") + prefilter_rule_1.source_network(action="add", literal="10.1.1.1") + prefilter_rule_1.destination_network(action="add", name=f"test_host_1_{namer}") + prefilter_rule_1.destination_network(action="add", name=f"test_net_1_{namer}") + prefilter_rule_1.destination_network(action="add", name=f"test_range_1_{namer}") + prefilter_rule_1.destination_network(action="add", name=f"test_fqdn_1_{namer}") + prefilter_rule_1.post() + time.sleep(1) + + prefilter.delete() + ip_host_1.delete() + ip_net_1.delete() + ip_range_1.delete() + fqdn_1.delete() + net_group_1.delete() + sec_zone_1.delete() + sec_zone_2.delete() From 178b1f9a3644e67dfe9af6033b1d7b0a6817aaf7 Mon Sep 17 00:00:00 2001 From: bekelly Date: Tue, 26 Nov 2019 14:50:38 +0800 Subject: [PATCH 3/5] Added source and destination ports Added remove and clear functions for source/destination networks --- .../policy_services/prefilterrules.py | 160 +++++++++++++++++- unit_tests/prefilter_rule.py | 46 +++++ 2 files changed, 203 insertions(+), 3 deletions(-) diff --git a/fmcapi/api_objects/policy_services/prefilterrules.py b/fmcapi/api_objects/policy_services/prefilterrules.py index 73892d3..eb3e4d9 100644 --- a/fmcapi/api_objects/policy_services/prefilterrules.py +++ b/fmcapi/api_objects/policy_services/prefilterrules.py @@ -5,6 +5,8 @@ from fmcapi.api_objects.object_services.fqdns import FQDNS from fmcapi.api_objects.object_services.networkgroups import NetworkGroups from fmcapi.api_objects.object_services.networkaddresses import NetworkAddresses +from fmcapi.api_objects.object_services.protocolportobjects import ProtocolPortObjects +from fmcapi.api_objects.object_services.portobjectgroups import PortObjectGroups import logging @@ -21,6 +23,8 @@ class PreFilterRules(APIClassTemplate): "destinationNetworks", "sourceInterfaces", "destinationInterfaces", + "sourcePorts", + "destinationPorts", "ruleType", "type", "enabled", @@ -223,6 +227,7 @@ def source_network(self, action, name=None, literal=None): raise ValueError( "Adding source literal and object at the same time not supported" ) + return if not hasattr(self, "sourceNetworks"): self.sourceNetworks = {"objects": [], "literals": []} @@ -232,15 +237,26 @@ def source_network(self, action, name=None, literal=None): self.sourceNetworks["literals"].append( {"type": literal_type, "value": literal} ) + return - elif action == "add" and name: + if name: new_object = self.find_object(name) + if action == "add": # Check if object is already in the list and if not, then add it if new_object and new_object not in self.sourceNetworks["objects"]: logging.info(f'Adding "{name}" to sourceNetworks for prefilter rule') self.sourceNetworks["objects"].append(new_object) + elif action == "remove": + index = self.sourceNetworks["objects"].index(new_object) + logging.info(f'Removing "{new_object}" from sourceNetworks') + self.sourceNetworks["objects"].pop(index) + + elif action == "clear": + logging.info("Clearing all destination networks") + del self.destinationNetworks + def destination_network(self, action, name=None, literal=None): """ Adds either an object (name) or a literal to destinationNetworks @@ -255,6 +271,7 @@ def destination_network(self, action, name=None, literal=None): raise ValueError( "Adding source literal and object at the same time not supported" ) + return if not hasattr(self, "destinationNetworks"): self.destinationNetworks = {"objects": [], "literals": []} @@ -264,17 +281,28 @@ def destination_network(self, action, name=None, literal=None): self.destinationNetworks["literals"].append( {"type": literal_type, "value": literal} ) + return - elif action == "add" and name: + if name: new_object = self.find_object(name) + if action == "add": # Check if object is already in the list and if not, then add it - if new_object and new_object not in self.destinationNetworks["objects"]: + if new_object not in self.destinationNetworks["objects"]: logging.info( f'Adding "{name}" to destinationNetworks for prefilter rule' ) self.destinationNetworks["objects"].append(new_object) + elif action == "remove": + index = self.destinationNetworks["objects"].index(new_object) + logging.info(f'Removing "{new_object}" from destinationNetworks') + self.destinationNetworks["objects"].pop(index) + + elif action == "clear": + logging.info("Clearing all destination networks") + del self.destinationNetworks + def find_object(self, name): """ Search through IP net objects, network group objects and FQDN objects by name @@ -363,3 +391,129 @@ def rule_type(self, rule_type): logging.warning( f'Rule type "{rule_type}" is not valid. Must be "PREFILTER" or "TUNNEL"' ) + + def source_port(self, action, name=None, literal=None): + """ + Create the source protocol and ports + Args: + action (str): Add, delete or clear + name (str): Name of port object + literal (dict): Dictionary of protocol and port expressed as integers + """ + logging.debug("In source_port for PreFilterRules") + + if literal and name: + raise ValueError( + "Adding source literal and object at the same time not supported" + ) + return + + if not hasattr(self, "sourcePorts"): + self.sourcePorts = {"literals": [], "objects": []} + + if action == "add" and literal: + if self._port_literal_verify(literal): + literal["type"] = "PortLiteral" + self.sourcePorts["literals"].append(literal) + return + + if name: + port_object = self.find_port_object(name) + + if action == "add": + if port_object and port_object not in self.sourcePorts["objects"]: + logging.info(f'Adding "{port_object}" to sourcePorts') + self.sourcePorts["objects"].append(port_object) + + elif action == "remove": + index = self.sourcePorts["objects"].index(port_object) + logging.info(f'Removing "{port_object}" from sourcePorts') + self.sourcePorts["objects"].pop(index) + + elif action == "clear": + del self.sourcePorts + + def destination_port(self, action, name=None, literal=None): + """ + Create the destination protocol and ports + Args: + action (str): Add, delete or clear + name (str): Name of port object + literal (dict): Dictionary of protocol and port expressed as integers + """ + logging.debug("In destination_port for PreFilterRules") + + if literal and name: + raise ValueError( + "Adding destination literal and object at the same time not supported" + ) + return + + if not hasattr(self, "destinationPorts"): + self.destinationPorts = {"literals": [], "objects": []} + + if action == "add" and literal: + if self._port_literal_verify(literal): + literal["type"] = "PortLiteral" + self.destinationPorts["literals"].append(literal) + return + + if name: + port_object = self.find_port_object(name) + + if action == "add": + if port_object and port_object not in self.destinationPorts["objects"]: + logging.info(f'Adding "{port_object}" to destinationPorts') + self.destinationPorts["objects"].append(port_object) + + elif action == "remove": + index = self.destinationPorts["objects"].index(port_object) + logging.info(f'Removing "{port_object}" from destinationPorts') + self.destinationPorts["objects"].pop(index) + + elif action == "clear": + del self.destinationPorts + + def find_port_object(self, name): + """ + Find port object or port group object and return dictionary + Args: + name (str): Name of port object/port group object + Returns: + Dictionary of port object + """ + protocol_port = ProtocolPortObjects(fmc=self.fmc, name=name) + resp = protocol_port.get() + if "id" in resp.keys(): + return {"name": name, "id": resp["id"], "type": resp["type"]} + + protocol_port_group = PortObjectGroups(fmc=self.fmc, name=name) + resp = protocol_port_group.get() + if "id" in resp.keys(): + return {"name": name, "id": resp["id"], "type": resp["type"]} + + return None + + @staticmethod + def _port_literal_verify(literal): + """ + Ensure that the literal is expressed as integers. It's too hard dealing with named protocols/ports. + Also check that protocol and ports keys are in the dictionary + Args: + literal (dict): Dictionary with protocol and ports + Returns: + Bool + """ + if not isinstance(literal, dict): + logging.warning( + 'Invalid port literal. Must be defined as a dictionary: "{"protocol": , "port": Date: Fri, 29 Nov 2019 16:01:48 +0800 Subject: [PATCH 4/5] Added VLAN tag support --- .../policy_services/prefilterrules.py | 105 +++++++++++++++++- unit_tests/prefilter_rule.py | 30 +++-- 2 files changed, 117 insertions(+), 18 deletions(-) diff --git a/fmcapi/api_objects/policy_services/prefilterrules.py b/fmcapi/api_objects/policy_services/prefilterrules.py index eb3e4d9..b04535a 100644 --- a/fmcapi/api_objects/policy_services/prefilterrules.py +++ b/fmcapi/api_objects/policy_services/prefilterrules.py @@ -7,6 +7,7 @@ from fmcapi.api_objects.object_services.networkaddresses import NetworkAddresses from fmcapi.api_objects.object_services.protocolportobjects import ProtocolPortObjects from fmcapi.api_objects.object_services.portobjectgroups import PortObjectGroups +from fmcapi.api_objects.object_services.vlangrouptags import VlanTags, VlanGroupTags import logging @@ -25,6 +26,7 @@ class PreFilterRules(APIClassTemplate): "destinationInterfaces", "sourcePorts", "destinationPorts", + "vlanTags", "ruleType", "type", "enabled", @@ -242,9 +244,12 @@ def source_network(self, action, name=None, literal=None): if name: new_object = self.find_object(name) + if not new_object: + return + if action == "add": # Check if object is already in the list and if not, then add it - if new_object and new_object not in self.sourceNetworks["objects"]: + if new_object not in self.sourceNetworks["objects"]: logging.info(f'Adding "{name}" to sourceNetworks for prefilter rule') self.sourceNetworks["objects"].append(new_object) @@ -286,6 +291,9 @@ def destination_network(self, action, name=None, literal=None): if name: new_object = self.find_object(name) + if not new_object: + return + if action == "add": # Check if object is already in the list and if not, then add it if new_object not in self.destinationNetworks["objects"]: @@ -323,6 +331,7 @@ def find_object(self, name): if object_id: return {"name": name, "id": object_id, "type": object_type} + logging.warning(f'Unable to find network object "{name}"') return None def find_network_object(self, name): @@ -418,10 +427,13 @@ def source_port(self, action, name=None, literal=None): return if name: - port_object = self.find_port_object(name) + port_object = self._find_port_object(name) + + if not port_object: + return if action == "add": - if port_object and port_object not in self.sourcePorts["objects"]: + if port_object not in self.sourcePorts["objects"]: logging.info(f'Adding "{port_object}" to sourcePorts') self.sourcePorts["objects"].append(port_object) @@ -459,10 +471,13 @@ def destination_port(self, action, name=None, literal=None): return if name: - port_object = self.find_port_object(name) + port_object = self._find_port_object(name) + + if not port_object: + return if action == "add": - if port_object and port_object not in self.destinationPorts["objects"]: + if port_object not in self.destinationPorts["objects"]: logging.info(f'Adding "{port_object}" to destinationPorts') self.destinationPorts["objects"].append(port_object) @@ -474,7 +489,7 @@ def destination_port(self, action, name=None, literal=None): elif action == "clear": del self.destinationPorts - def find_port_object(self, name): + def _find_port_object(self, name): """ Find port object or port group object and return dictionary Args: @@ -492,6 +507,7 @@ def find_port_object(self, name): if "id" in resp.keys(): return {"name": name, "id": resp["id"], "type": resp["type"]} + logging.warning(f'Unable to find port object "{name}"') return None @staticmethod @@ -517,3 +533,80 @@ def _port_literal_verify(literal): return False return True + + def vlan_tags(self, action, name=None, literal=None): + """ + Add, remove or clear VLAN tags + Args: + action (str): Add, remove or clear + name (str): Name of VLAN tag object + literal (str): VLAN tag or range + """ + logging.debug("In vlan_tags() for PreFilterRules") + if literal and name: + raise ValueError( + "Adding/Removing VLAN literal and object at the same time not supported" + ) + return + + if not hasattr(self, "vlanTags"): + self.vlanTags = {"literals": [], "objects": []} + + if action == "add" and literal: + start_vlan, end_vlan = self._vlan_tag_check(literal) + self.vlanTags["literals"].append( + {"startTag": start_vlan, "endTag": end_vlan, "type": "VlanTagLiteral"} + ) + return + + if name: + vlan_object = self._find_vlan_object(name) + + if not vlan_object: + return + + if action == "add": + if vlan_object not in self.vlanTags["objects"]: + self.vlanTags["objects"].append(vlan_object) + + if action == "remove": + index = self.vlanTags["objects"].index(vlan_object) + logging.info(f'Removing "{vlan_object}" from vlanTags') + self.vlanTags["objects"].pop(index) + + elif action == "clear": + del self.vlanTags + + @staticmethod + def _vlan_tag_check(literal): + """ + Check if the VLAN tag literal is a range or not and returns start and end tag number + Args: + literal (str): VLAN tag literal + Returns: + start_vlan (str), end_vlan (str) + """ + if "-" in literal: + vlans = literal.strip("-") + return vlans[0], vlans[1] + else: + return literal, literal + + def _find_vlan_object(self, name): + """ + Find the vlan or vlan group object by name + Args: + name (str): Object name + """ + vlan_object = VlanTags(fmc=self.fmc, name=name) + resp = vlan_object.get() + if "id" in resp.keys(): + return {"name": name, "id": resp["id"], "type": resp["type"]} + + vlan_object = VlanGroupTags(fmc=self.fmc, name=name) + resp = vlan_object.get() + if "id" in resp.keys(): + return {"name": name, "id": resp["id"], "type": resp["type"]} + + logging.warning(f'Unable to find vlan object "{name}"') + return None diff --git a/unit_tests/prefilter_rule.py b/unit_tests/prefilter_rule.py index b13a99a..a98bb36 100644 --- a/unit_tests/prefilter_rule.py +++ b/unit_tests/prefilter_rule.py @@ -41,15 +41,15 @@ def test__prefiler_rule(fmc): ) sec_zone_2.post() port_1 = ProtocolPortObjects( - fmc=fmc, name=f"test_port_1{namer}", port="8443", protocol="TCP" + fmc=fmc, name=f"test_port_1_{namer}", port="8443", protocol="TCP" ) port_1.post() port_2 = ProtocolPortObjects( - fmc=fmc, name=f"test_port_2{namer}", port="161", protocol="UDP" + fmc=fmc, name=f"test_port_2_{namer}", port="161", protocol="UDP" ) port_2.post() port_3 = ProtocolPortObjects( - fmc=fmc, name=f"test_port_3{namer}", port="0-1023", protocol="TCP" + fmc=fmc, name=f"test_port_3_{namer}", port="0-1023", protocol="TCP" ) port_3.post() time.sleep(1) @@ -88,15 +88,21 @@ def test__prefiler_rule(fmc): prefilter_rule_2 = PreFilterRules(fmc=fmc, prefilter_name=namer) prefilter_rule_2.name = "test_2" prefilter_rule_2.enabled = True - prefilter_rule_2.action = 'FASTPATH' - prefilter_rule_2.source_port(action='add', literal={'protocol': '6', 'port': '22'}) - prefilter_rule_2.source_port(action='add', literal={'protocol': '6', 'port': '443'}) - prefilter_rule_2.source_port(action='add', literal={'protocol': '17', 'port': '53'}) - prefilter_rule_2.destination_port(action='add', name=f"test_port_1{namer}") - prefilter_rule_2.destination_port(action='add', name=f"test_port_2{namer}") - prefilter_rule_2.destination_port(action='add', name=f"test_port_3{namer}") - prefilter_rule_2.destination_port(action='remove', name=f"test_port_3{namer}") - prefilter_rule_2.destination_port(action='add', name=f'port_group_1_{namer}') + prefilter_rule_2.action = "FASTPATH" + prefilter_rule_2.source_port(action="add", literal={"protocol": "6", "port": "22"}) + prefilter_rule_2.source_port(action="add", literal={"protocol": "6", "port": "443"}) + prefilter_rule_2.source_port(action="add", literal={"protocol": "17", "port": "53"}) + prefilter_rule_2.destination_port(action="add", name=f"test_port_1_{namer}") + prefilter_rule_2.destination_port(action="add", name=f"test_port_2_{namer}") + prefilter_rule_2.destination_port(action="add", name=f"test_port_3_{namer}") + prefilter_rule_2.destination_port(action="remove", name=f"test_port_3{namer}") + prefilter_rule_2.destination_port(action="add", name=f"port_group_1_{namer}") + prefilter_rule_2.vlan_tags(action="add", literal="100-110") + prefilter_rule_2.vlan_tags(action="add", name=f"vlan_tag_1_{namer}") + prefilter_rule_2.vlan_tags(action="add", name=f"vlan_tag_2_{namer}") + prefilter_rule_2.vlan_tags(action="add", name=f"vlan_tag_3_{namer}") + prefilter_rule_2.vlan_tags(action="remove", name=f"vlan_tag_2_{namer}") + prefilter_rule_2.vlan_tags(action="add", name=f"vlan_group_1_{namer}") prefilter_rule_2.post() time.sleep(1) From f55747142edb427db2d8fbf5aa78225e7b69d028 Mon Sep 17 00:00:00 2001 From: bekelly Date: Fri, 29 Nov 2019 16:05:11 +0800 Subject: [PATCH 5/5] Completed prefilter tests --- unit_tests/prefilter_rule.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/unit_tests/prefilter_rule.py b/unit_tests/prefilter_rule.py index a98bb36..085c80e 100644 --- a/unit_tests/prefilter_rule.py +++ b/unit_tests/prefilter_rule.py @@ -11,6 +11,8 @@ SecurityZones, PortObjectGroups, ProtocolPortObjects, + VlanGroupTags, + VlanTags, ) @@ -59,6 +61,19 @@ def test__prefiler_rule(fmc): port_group_1.named_ports(action="add", name=port_3.name) port_group_1.post() + vlan_tag_1 = VlanTags(fmc=fmc, name=f"vlan_tag_1_{namer}") + vlan_tag_1.vlans(start_vlan="1", end_vlan="9") + vlan_tag_1.post() + vlan_tag_2 = VlanTags(fmc=fmc, name=f"vlan_tag_2_{namer}") + vlan_tag_2.vlans(start_vlan="10", end_vlan="19") + vlan_tag_2.post() + vlan_tag_3 = VlanTags(fmc=fmc, name=f"vlan_tag_3_{namer}") + vlan_tag_3.vlans(start_vlan="20", end_vlan="29") + vlan_tag_3.post() + vlan_group_1 = VlanGroupTags(fmc=fmc, name=f"vlan_group_1_{namer}") + vlan_group_1.named_vlantags(action="add", name=f"vlan_tag_3_{namer}") + vlan_group_1.post() + logging.info(f'Creating test prefiler "{namer}"') prefilter = PreFilterPolicies(fmc=fmc, name=namer) prefilter.post() @@ -118,3 +133,7 @@ def test__prefiler_rule(fmc): port_1.delete() port_2.delete() port_3.delete() + vlan_group_1.delete() + vlan_tag_1.delete() + vlan_tag_2.delete() + vlan_tag_3.delete()