Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added initial support for cdFMC #203

Merged
merged 7 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ Then to use the code best start a "with" statement that creates an instance of t
Then either code away referencing the fmc variable to get to the internal methods of the FMC class **or** utilize
the various class objects to ease your coding needs.

If you are using cdFMC, your api requests will need to proxied to that cdFMC via CDO. The authentication mechanisms used here are different than on-prem FMC as it looks like CDO's webserver is proxying these api requests to "cloud" FMC. Thankfully, it seems like aside from authentication, cdFMC has all the same api endpoints available. Follow the instructions [here](https://www.cisco.com/c/en/us/td/docs/security/firepower/730/Rapid-Release/API/CDO/cloud_delivered_firewall_management_center_rest_api_quick_start_guide/Connecting_With_A_Client.html) to create a CDO user capable of using the CDO api. This process will give you a JWT token which is what will be used for authenticating to CDO api, even when sending api requests to the cdFMC api.

`with fmcapi.FMC(host='examplecompany.app.us.cdo.cisco.com', cdfmc=True, api_key=$JWT-TOKEN-FROM-CDO-API-USER autodeploy=False) as fmc:`

NOTE: This JWT token is NOT the token used when utilizing the cdFMC api-explorer. It looks like the authentication for the cdFMC api-explorer is handled under the hood by CDO authenticating the CDO user and then just using a embedded "api-explorer" user that is cisco managed. This probably also means that cdFMC api-explorer can only have one session since any CDO user that goes to the cdFMC api-explorer will just get a new session of the cdFMC user 'api-explorer' thus logging any other existing 'api-explorer' user out.
``

Building out an example network is in the "example" directory. This isn't fully completed but it should help you get
an idea of what is possible.

Expand Down
132 changes: 127 additions & 5 deletions fmcapi/api_objects/apiclasstemplate.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Super class(es) that is inherited by all API objects."""
from .helper_functions import syntax_correcter
from .helper_functions import syntax_correcter, bulk_list_splitter, check_uuid
import logging
import json

Expand All @@ -14,6 +14,8 @@ class APIClassTemplate(object):
REQUIRED_FOR_DELETE = ["id"]
REQUIRED_FOR_GET = [""]
REQUIRED_GET_FILTERS = []
REQUIRED_FOR_BULK_POST = ["bulk"]
REQUIRED_FOR_BULK_DELETE = ["bulk"]
FILTER_BY_NAME = False
URL = ""
URL_SUFFIX = ""
Expand Down Expand Up @@ -277,6 +279,17 @@ def valid_for_post(self):
:return: (boolean)
"""
logging.debug("In valid_for_post() for APIClassTemplate class.")
if "bulk_post_data" in self.__dict__:
missing_required_item = False
# Check within each payload to ensure required for post is handled
for i in self.bulk_post_data:
for item in self.REQUIRED_FOR_POST:
if item not in i:
logging.error(f'BULK POST FAILED: Missing value "{item}" in {i}')
missing_required_item = True
if missing_required_item:
return False
return True
for item in self.REQUIRED_FOR_POST:
if item not in self.__dict__:
logging.error(f'Missing value "{item}" for POST request.')
Expand All @@ -303,23 +316,35 @@ def post(self, **kwargs):
self.put()
else:
if self.valid_for_post():
if "bulk_post_data" in self.__dict__:
url = f"{self.URL}?bulk=true"
else:
url = f"{self.URL}"
if self.dry_run:
logging.info(
"Dry Run enabled. Not actually sending to FMC. Here is what would have been sent:"
)
logging.info("\tMethod = POST")
logging.info(f"\tURL = {self.URL}")
logging.info(f"\tURL = {url}")
logging.info(f"\tJSON = {self.show_json}")
return False
response = self.fmc.send_to_api(
method="post", url=self.URL, json_data=self.format_data()
)
if "bulk_post_data" in self.__dict__:
response = self.fmc.send_to_api(
method="post", url=url, json_data=self.bulk_post_data
)
else:
response = self.fmc.send_to_api(
method="post", url=url, json_data=self.format_data()
)
if response:
self.parse_kwargs(**response)
if "name" in self.__dict__ and "id" in self.__dict__:
logging.info(
f'POST success. Object with name: "{self.name}" and id: "{self.id}" created in FMC.'
)
elif "bulk_post_data" in self.__dict__:
logging.info(f'BULK POST success. Items bulk posted: {len(response["items"])}')
logging.debug(f'BULK POST: {response["items"]}')
else:
logging.debug(
'POST success but no "id" or "name" values in API response.'
Expand Down Expand Up @@ -395,6 +420,15 @@ def valid_for_delete(self):
:return: (boolean)
"""
logging.debug("In valid_for_delete() for APIClassTemplate class.")
if "bulk_delete_data" in self.__dict__:
# Validate bulk delete data is a list of valid ids
valid = True
for i in self.bulk_delete_data:
if not check_uuid(i):
valid = False
if not valid:
return False
return True
for item in self.REQUIRED_FOR_DELETE:
if item not in self.__dict__:
logging.error(f'Missing value "{item}" for DELETE request.')
Expand All @@ -419,6 +453,10 @@ def delete(self, **kwargs):
url = f"{self.URL}/{self.targetId}"
if "backupVersion" in self.__dict__:
url += f"?backupVersion={self.backupVersion}"
elif "bulk_delete_data" in self.__dict__:
# Convert bulk delete data to csv string to insert into url
self.bulk_delete_str = ','.join(map(str,self.bulk_delete_data))
url = f"{self.URL}?filter=ids:{self.bulk_delete_str}&bulk=true"
else:
url = f"{self.URL}/{self.id}"
if self.dry_run:
Expand Down Expand Up @@ -448,6 +486,9 @@ def delete(self, **kwargs):
logging.info(
f'DELETE success. Object with targetId: "{self.targetId}" deleted from FMC.'
)
elif "bulk_delete_data" in self.__dict__:
logging.info(f'Bulk DELETE success. Objects deleted in FMC: {len(self.bulk_delete_data)}')
logging.debug(f'Bulk DELETE: {self.bulk_delete_data}')
else:
logging.info(f'DELETE success. Object id: "{self.id}" deleted in FMC.')
return response
Expand All @@ -456,3 +497,84 @@ def delete(self, **kwargs):
"delete() method failed due to failure to pass valid_for_delete() test."
)
return False

def valid_for_bulk_delete(self):
"""
Use REQUIRED_FOR_BULK_DELETE to ensure all necessary variables exist prior to submitting to API.

:return: (boolean)
"""
logging.debug("In valid_for_bulk_delete() for APIClassTemplate class.")

for item in self.REQUIRED_FOR_BULK_DELETE:
if item not in self.__dict__:
logging.error(f'Missing value "{item}" for bulk DELETE request.')
return False
return True

def bulk_delete(self, **kwargs):
"""
This is a shim in front of the normal delete() to handle bulk deletes.

"""
logging.debug("In bulk_delete() for APIClassTemplate class.")
self.parse_kwargs(**kwargs)
if self.fmc.serverVersion < self.FIRST_SUPPORTED_FMC_VERSION:
logging.error(
f"Your FMC version, {self.fmc.serverVersion} does not support bulk DELETE of this feature."
)
return False
if self.valid_for_bulk_delete():
if len(self.bulk) > 0:
if len(self.bulk) > 49:
self.chunks = bulk_list_splitter(self.bulk)
for chunk in self.chunks:
self.bulk_delete_data = chunk
# self.ids_str = ','.join(map(str,self.ids))
APIClassTemplate.delete(self)
else:
self.bulk_delete_data = self.bulk
# self.ids_str = ','.join(map(str,self.ids))
APIClassTemplate.delete(self)

def valid_for_bulk_post(self):
"""
Use REQUIRED_FOR_BULK_POST to ensure all necessary variables exist prior to submitting to API.

:return: (boolean)
"""
logging.debug("In valid_for_bulk_post() for APIClassTemplate class.")

for item in self.REQUIRED_FOR_BULK_POST:
if item not in self.__dict__:
logging.error(f'Missing value "{item}" for bulk POST request.')
return False
return True

def bulk_post(self, **kwargs):
"""
This is a shim in front of the normal post() to handle bulk posts.

"""
logging.debug("In bulk_post() for APIClassTemplate class.")
self.parse_kwargs(**kwargs)
if self.fmc.serverVersion < self.FIRST_SUPPORTED_FMC_VERSION:
logging.error(
f"Your FMC version, {self.fmc.serverVersion} does not support bulk POST of this feature."
)
return False
if self.valid_for_bulk_post():
self.bulk_ids = []
if len(self.bulk) > 0:
if len(self.bulk) > 49:
self.chunks = bulk_list_splitter(self.bulk)
for chunk in self.chunks:
self.bulk_post_data = chunk
response = APIClassTemplate.post(self)
for i in response['items']:
self.bulk_ids.append(i['id'])
else:
self.bulk_post_data = self.bulk
response = APIClassTemplate.post(self)
for i in response['items']:
self.bulk_ids.append(i['id'])
26 changes: 26 additions & 0 deletions fmcapi/api_objects/helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import ipaddress
import json
import logging
import uuid


logging.debug(f"In the {__name__} module.")

Expand Down Expand Up @@ -166,3 +168,27 @@ def validate_vlans(start_vlan, end_vlan=""):
return start_vlan, end_vlan
else:
return 1, 4094


def bulk_list_splitter(ids, chunk_size=49):
"""_summary_

Args:
ids (list): list of ids used in bulk post/delete operations
chunk_size (int, optional): bulk operations seem to be limited to 49 max ids in one url. Defaults to 49.

Returns:
list: list of lists where each inner list is 49 items
"""
chunks = []
for id in range(0, len(ids), chunk_size):
chunks.append(ids[id:id + chunk_size])
return chunks

def check_uuid(uuid_input):
try:
uuid.UUID(str(uuid_input))
return True
except ValueError:
return False

33 changes: 28 additions & 5 deletions fmcapi/fmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(
timeout=5,
wait_time=15,
api_key=None,
cdfmc=False,
uuid=None,
):
"""
Expand Down Expand Up @@ -121,6 +122,7 @@ def __init__(
self.error_response = None
self.wait_time = wait_time
self.api_key = api_key
self.cdfmc = cdfmc
self.uuid = uuid

def __enter__(self):
Expand All @@ -145,7 +147,22 @@ def __enter__(self):
self.uuid = self.mytoken.uuid

else:
if self.uuid is None:
if self.cdfmc:
logging.debug("cdFMC is True.")
# cdFMC doesn't use the normal user/pass auth that responds with the token and global uuid
# initial lookup of the global domain uuid done here using the JWT token from cdo api user
logging.debug(f'Fetching cdFMC global domain uuid.')
domain_info = self.send_to_api(method="get",url=f"https://{self.host}/api/fmc_platform/v1/info/domain")
logging.debug(domain_info)
if domain_info is not None:
for i in domain_info['items']:
if i['name'] == 'Global':
self.uuid = i['uuid']
logging.debug(f'cdFMC global uuid found! {self.uuid}')
else:
logging.error(f"Unable to retrieve global domain UUID from cdFMC")
logging.error(domain_info)
elif self.uuid is None:
logging.error("If using an API_KEY, you must provide a UUID")
exit(1)

Expand Down Expand Up @@ -205,10 +222,16 @@ def send_to_api(
self.more_items = []
self.page_counter = 0
if self.api_key is not None:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
if self.cdfmc:
headers = {
"accept": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
else:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
else:
if headers == "":
# These values for headers works for most API requests.
Expand Down
2 changes: 2 additions & 0 deletions unit_tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
from .connectionprofiles import test__connectionprofiles
from .dynamicaccesspolicies import test__dynamicaccesspolicies
from .timeranges import test__timeranges
from .fqdns import test__fqdns

logging.debug("In the unit-tests __init__.py file.")

Expand Down Expand Up @@ -160,4 +161,5 @@
"test__connectionprofiles",
"test__dynamicaccesspolicies",
"test__timeranges",
"test__fqdns"
]
32 changes: 32 additions & 0 deletions unit_tests/fqdns.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import fmcapi
import time
from .helper_functions import id_generator


def test__fqdns(fmc):
Expand All @@ -20,4 +22,34 @@ def test__fqdns(fmc):

obj1.delete()

obj = fmcapi.FQDNS(fmc=fmc)
obj.bulk = []
for i in range(10):
obj.bulk.append(
{
"name" : f"_fmcapi_test_{id_generator()}",
"value" : "www.cisco.com",
"dnsResolution" : "IPV4_ONLY"
}
)
obj.bulk_post()
obj.bulk = obj.bulk_ids
obj.bulk_delete()
del obj

obj = fmcapi.FQDNS(fmc=fmc)
obj.bulk = []
for i in range(50):
obj.bulk.append(
{
"name" : f"_fmcapi_test_{id_generator()}",
"value" : "www.cisco.com",
"dnsResolution" : "IPV4_ONLY"
}
)
obj.bulk_post()
obj.bulk = obj.bulk_ids
obj.bulk_delete()
del obj

logging.info("FQDNS DNSServerGroups class done.\n")
Loading