Skip to content

Commit

Permalink
Merge branch 'marksull:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
AmrithaNagarajan authored Jan 29, 2024
2 parents 92f8974 + dd0e9a4 commit f120293
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 89 deletions.
102 changes: 51 additions & 51 deletions TestingUserScript.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,58 +31,58 @@ def main():
logging.info("# ### Mega Test Start!!! ### #\n\n")
unit_tests.test__fmc_version(fmc=fmc1)

"""
# Working Tests
unit_tests.test__vlan_group_tag(fmc=fmc1)
unit_tests.test__deployment_requests(fmc=fmc1)
unit_tests.test__audit_records(fmc=fmc1)
unit_tests.test__deployable_devices(fmc=fmc1)
unit_tests.test__devicegrouprecords(fmc=fmc1)

unit_tests.test__application_type(fmc=fmc1)
unit_tests.test__application_tag(fmc=fmc1)
unit_tests.test__application(fmc=fmc1)
unit_tests.test__application_risk(fmc=fmc1)
unit_tests.test__application_filter(fmc=fmc1)
unit_tests.test__application_productivity(fmc=fmc1)
unit_tests.test__application_category(fmc=fmc1)
unit_tests.test__cert_enrollment(fmc=fmc1)
unit_tests.test__country(fmc=fmc1)
unit_tests.test__filepolicies(fmc=fmc1)
unit_tests.test__continent(fmc=fmc1)
unit_tests.test__dns_servers_group(fmc=fmc1)
unit_tests.test__url_group(fmc=fmc1)
unit_tests.test__network_group(fmc=fmc1)
unit_tests.test__ip_addresses(fmc=fmc1)
unit_tests.test__variable_set(fmc=fmc1)
unit_tests.test__ip_host(fmc=fmc1)
unit_tests.test__ip_network(fmc=fmc1)
unit_tests.test__ip_range(fmc=fmc1)
unit_tests.test__extended_acls(fmc=fmc1)
unit_tests.test__geolocations(fmc=fmc1)
unit_tests.test__icmpv4(fmc=fmc1)
unit_tests.test__icmpv6(fmc=fmc1)
unit_tests.test__ikev1(fmc=fmc1)
unit_tests.test__ikev2(fmc=fmc1)
unit_tests.test__url(fmc=fmc1)
unit_tests.test__vlan_tag(fmc=fmc1)
unit_tests.test__protocol_port(fmc=fmc1)
unit_tests.test__security_zone(fmc=fmc1)
unit_tests.test__slamonitor(fmc=fmc1)
unit_tests.test__intrusion_policy(fmc=fmc1)
unit_tests.test__access_control_policy(fmc=fmc1)
unit_tests.test__acp_rule(fmc=fmc1)
unit_tests.test__port_object_group(fmc=fmc1)
unit_tests.test__url_category(fmc=fmc1)
unit_tests.test__ports(fmc=fmc1)
unit_tests.test__autonat(fmc=fmc1)
unit_tests.test__manualnat(fmc=fmc1)
unit_tests.test__backup(fmc=fmc1) # Delete needs existing backup and may need an increased fmc timeout for large backups
unit_tests.test__objects_get_query_filters(fmc=fmc1)
unit_tests.test__usage(fmc=fmc1)
unit_tests.test__dynamic_objects(fmc=fmc1)
unit_tests.test__dynamic_objects_mappings(fmc=fmc1)
"""
# # Working Tests
# unit_tests.test__vlan_group_tag(fmc=fmc1)
# unit_tests.test__deployment_requests(fmc=fmc1)
# unit_tests.test__audit_records(fmc=fmc1)
# unit_tests.test__deployable_devices(fmc=fmc1)
# unit_tests.test__devicegrouprecords(fmc=fmc1)
#
# unit_tests.test__application_type(fmc=fmc1)
# unit_tests.test__application_tag(fmc=fmc1)
# unit_tests.test__application(fmc=fmc1)
# unit_tests.test__application_risk(fmc=fmc1)
# unit_tests.test__application_filter(fmc=fmc1)
# unit_tests.test__application_productivity(fmc=fmc1)
# unit_tests.test__application_category(fmc=fmc1)
# unit_tests.test__cert_enrollment(fmc=fmc1)
# unit_tests.test__country(fmc=fmc1)
# unit_tests.test__filepolicies(fmc=fmc1)
# unit_tests.test__continent(fmc=fmc1)
# unit_tests.test__dns_servers_group(fmc=fmc1)
# unit_tests.test__url_group(fmc=fmc1)
# unit_tests.test__network_group(fmc=fmc1)
# unit_tests.test__ip_addresses(fmc=fmc1)
# unit_tests.test__variable_set(fmc=fmc1)
# unit_tests.test__ip_host(fmc=fmc1)
# unit_tests.test__ip_network(fmc=fmc1)
# unit_tests.test__ip_range(fmc=fmc1)
# unit_tests.test__extended_acls(fmc=fmc1)
# unit_tests.test__geolocations(fmc=fmc1)
# unit_tests.test__icmpv4(fmc=fmc1)
# unit_tests.test__icmpv6(fmc=fmc1)
# unit_tests.test__ikev1(fmc=fmc1)
# unit_tests.test__ikev2(fmc=fmc1)
# unit_tests.test__url(fmc=fmc1)
# unit_tests.test__vlan_tag(fmc=fmc1)
# unit_tests.test__protocol_port(fmc=fmc1)
# unit_tests.test__security_zone(fmc=fmc1)
# unit_tests.test__slamonitor(fmc=fmc1)
# unit_tests.test__intrusion_policy(fmc=fmc1)
# unit_tests.test__access_control_policy(fmc=fmc1)
# unit_tests.test__acp_rule(fmc=fmc1)
# unit_tests.test__port_object_group(fmc=fmc1)
# unit_tests.test__url_category(fmc=fmc1)
# unit_tests.test__ports(fmc=fmc1)
# unit_tests.test__autonat(fmc=fmc1)
# unit_tests.test__manualnat(fmc=fmc1)
# unit_tests.test__backup(fmc=fmc1) # Delete needs existing backup and may need an increased fmc timeout for large backups
# unit_tests.test__objects_get_query_filters(fmc=fmc1)
# unit_tests.test__usage(fmc=fmc1)
# unit_tests.test__dynamic_objects(fmc=fmc1)
# unit_tests.test__dynamic_objects_mappings(fmc=fmc1)



"""
Expand Down
2 changes: 2 additions & 0 deletions fmcapi/api_objects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .device_services.redundantinterfaces import RedundantInterfaces
from .device_services.staticroutes import StaticRoutes
from .device_services.subinterfaces import SubInterfaces
from .health.terminateravpnsessions import TerminateRAVPNSessions
from .object_services.anyprotocolportobjects import AnyProtocolPortObjects
from .object_services.applicationcategories import ApplicationCategories
from .object_services.applicationfilters import ApplicationFilters
Expand Down Expand Up @@ -206,4 +207,5 @@
"Usage",
"DynamicObject",
"DynamicObjectMappings",
"TerminateRAVPNSessions",
]
8 changes: 8 additions & 0 deletions fmcapi/api_objects/health/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Health Classes."""

import logging
from .terminateravpnsessions import TerminateRAVPNSessions

logging.debug("In the health __init__.py file.")

__all__ = ["Health"]
50 changes: 50 additions & 0 deletions fmcapi/api_objects/health/terminateravpnsessions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from fmcapi.api_objects.apiclasstemplate import APIClassTemplate
import logging


class TerminateRAVPNSessions(APIClassTemplate):
"""The TerminateRAVPNSessions Object in the FMC."""

FIRST_SUPPORTED_FMC_VERSION = "7.3"
VALID_JSON_DATA = [
"id",
"name",
"description",
"usernames",
"type",
"deviceId",
"terminateBy",
"sessionIds",
"version",
]
VALID_FOR_KWARGS = VALID_JSON_DATA + []
REQUIRED_FOR_POST = ["terminateBy"]

URL_SUFFIX = "/health/ravpnsessions/operational/terminateravpnsessions"

def __init__(self, fmc, **kwargs):
"""
Initialize TerminateRAVPNSessions object.
:param fmc (object): FMC object
:param **kwargs: Any other values passed during instantiation.
:return: requests response
"""
super().__init__(fmc, **kwargs)
logging.debug("In __init__() for TerminateRAVPNSessions class.")
self.parse_kwargs(**kwargs)

def get(self, **kwargs):
"""GET method for API for TerminateRAVPNSessions not supported."""
logging.info("GET method for API for TerminateRAVPNSessions not supported.")
pass

def delete(self, **kwargs):
"""DELETE method for API for TerminateRAVPNSessions not supported."""
logging.info("DELETE method for API for TerminateRAVPNSessions not supported.")
pass

def put(self):
"""PUT method for API for TerminateRAVPNSessions not supported."""
logging.info("PUT method for API for TerminateRAVPNSessions not supported.")
pass
155 changes: 152 additions & 3 deletions fmcapi/api_objects/object_services/dynamicobjectmappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
class DynamicObjectMappings(APIClassTemplate):
"""The Dynamic Object in the FMC."""

VALID_JSON_DATA = []
VALID_JSON_DATA = ["add", "remove", "mappings", "type", "id", "name"]
VALID_FOR_KWARGS = VALID_JSON_DATA + []
URL_SUFFIX = "/object/dynamicobjectmappings"
VALID_CHARACTERS_FOR_NAME = """[.\w\d_\- ]"""
Expand All @@ -30,7 +30,7 @@ def __init__(self, fmc, **kwargs):
if "id" in kwargs:
self.obj_id = kwargs["id"]

def mappings(self, action, value, name):
def handle_mappings(self, action, value, name):
"""
Associate mappings to dynamic object.
Expand Down Expand Up @@ -74,6 +74,155 @@ def mappings(self, action, value, name):
)

def get(self):
self.URL_SUFFIX = f"/object/dynamicobjects/{str(self.obj_id)}/mappings"
self.__init__(self.fmc)
return self._get()

def put(self):
self.URL_SUFFIX = f"/object/dynamicobjects/{str(self.obj_id)}/mappings"
super().__init__(self.fmc)
return super().put()

def delete(self):
self.URL_SUFFIX = f"/object/dynamicobjects/{str(self.obj_id)}/mappings"
super().__init__(self.fmc)
return super().get()
return super().delete()

def _get(self, **kwargs):
"""
To get mappings url is GET /api/fmc_config/v1/domain/{domainUUID}/object/dynamicobjects/{objectId}/mappings
"""
logging.debug("In get() for APIClassTemplate class.")
self.parse_kwargs(**kwargs)
if self.fmc.serverVersion < self.FIRST_SUPPORTED_FMC_VERSION:
logging.error(
f"Your FMC version, {self.fmc.serverVersion} does not support GET of this feature."
)
return {"items": []}
if self.valid_for_get():
if "id" in self.__dict__ or "targetId" in self.__dict__:
if "targetId" in self.__dict__:
url = f"{self.URL}"
if "backupVersion" in self.__dict__:
url += f"?backupVersion={self.backupVersion}"
else:
url = f"{self.URL}"
if self.dry_run:
logging.info(
"Dry Run enabled. Not actually sending to FMC. Here is what would have been sent:"
)
logging.info("\tMethod = GET")
logging.info(f"\tURL = {self.URL}")
return False
response = self.fmc.send_to_api(method="get", url=url)
try:
self.parse_kwargs(**response)
except TypeError as e:
logging.error(
f"Response from FMC GET with 'id', {self.id}, returned none."
f"That 'id' probably was deleted. Error: {e}."
)
if "name" in self.__dict__:
logging.info(
f'GET success. Object with name: "{self.name}" and id: "{self.id}" fetched from FMC.'
)
elif "targetId" in self.__dict__:
if "backupVersion" in self.__dict__:
logging.info(
f'GET success. Object with targetId: "{self.targetId}" backupVersion: "{self.backupVersion}" fetched from FMC.'
)
logging.info(
f'GET success. Object with targetId: "{self.targetId}" fetched from FMC.'
)
else:
logging.info(
f'GET success. Object with id: "{self.id}" fetched from FMC.'
)
elif "name" in self.__dict__:
if self.FILTER_BY_NAME:
url = f"{self.URL}?name={self.name}&expanded=true"
else:
url = f"{self.URL}?expanded=true"
if "limit" in self.__dict__:
url = f"{url}&limit={self.limit}"
if "offset" in self.__dict__:
url = f"{url}&offset={self.offset}"
response = self.fmc.send_to_api(method="get", url=url)
if "items" not in response:
response["items"] = []
for item in response["items"]:
if "name" in item:
if item["name"] == self.name:
self.id = item["id"]
self.parse_kwargs(**item)
logging.info(
f'GET success. Object with name: "{self.name}" and id: "{self.id}" '
f"fetched from FMC."
)
return item
else:
logging.warning(
f'No "name" attribute associated with this item to check against {self.name}.'
)
if "id" not in self.__dict__:
logging.warning(f"\tGET query for {self.name} is not found.")
logging.debug(
f"\tGET query for {self.name} is not found.\n\t\tResponse: {json.dumps(response)}"
)
elif len(self.get_filters) > 0:
url_filter = ""
for key, value in self.get_filters.items():
# Filter value must not be empty otherwise will result in a 400 response
if value != "":
url_filter += f"{key}%3A{value};"
else:
logging.warning(
f"Terminating GET - {self.URL}?expanded={self.expanded}&filter={url_filter}"
)
logging.warning(f"{key} MUST have a non empty value")
return False
url = f"{self.URL}?expanded={self.expanded}&filter={url_filter}"
if self.dry_run:
logging.info(
"Dry Run enabled. Not actually sending to FMC. Here is what would have been sent:"
)
logging.info("\tMethod = GET")
logging.info(f"\tURL = {url}")
return False
response = self.fmc.send_to_api(method="get", url=url)
if "items" not in response:
logging.info(
f"GET success. No Objects were found with query filter: {self.get_filters}"
)
return response
else:
response_count = response.get("paging").get("count")
logging.info(
f"GET success. {response_count} items found that match query filter: {self.get_filters}"
)
return response
else:
logging.debug(
"GET query for object with no name or id set. "
"Returning full list of these object types instead."
)
url_suffix_start = "?"
if url_suffix_start in self.URL:
url_suffix_start = "&"
url = f"{self.URL}{url_suffix_start}expanded=true&limit={self.limit}"
if self.dry_run:
logging.info(
"Dry Run enabled. Not actually sending to FMC. Here is what would have been sent:"
)
logging.info("\tMethod = GET")
logging.info(f"\tURL = {self.URL}")
return False
response = self.fmc.send_to_api(method="get", url=url)
if "items" not in response:
response["items"] = []
return response
else:
logging.warning(
"get() method failed due to failure to pass valid_for_get() test."
)
return False
Loading

0 comments on commit f120293

Please sign in to comment.