Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Device Config backup workflow module #382

Merged
288 changes: 284 additions & 4 deletions plugins/module_utils/dnac.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import re
import socket
import time
import traceback


class DnacBase():
Expand Down Expand Up @@ -542,6 +543,107 @@ def check_string_dictionary(self, task_details_data):
except json.JSONDecodeError:
pass
return None

def get_device_ip_from_device_id(self, site_id):
"""
Retrieve the management IP addresses and their corresponding instance UUIDs of devices associated with a specific site in Cisco Catalyst Center.

Args:
site_id (str): The ID of the site to be retrieved.

Returns:
dict: A dictionary mapping management IP addresses to their instance UUIDs, or an empty dict if no devices found.
"""

mgmt_ip_to_instance_id_map = {}

try:
response = self.get_device_ids_from_site(site_id)

if not response:
raise ValueError("No response received from get_device_ids_from_site")

self.log("Received API response from 'get_device_ids_from_site': {0}".format(str(response)), "DEBUG")

for device_id in response:
device_response = self.dnac._exec(
family="devices",
function="get_device_by_id",
op_modifies=True,
params={"id": device_id}
)

management_ip = device_response.get("response", {}).get("managementIpAddress")
instance_uuid = device_response.get("response", {}).get("instanceUuid")
if management_ip and instance_uuid:
mgmt_ip_to_instance_id_map[management_ip] = instance_uuid
else:
self.log("Management IP or instance UUID not found for device ID: {0}".format(device_id), "WARNING")

except Exception as e:
self.log("Unable to fetch the device(s) associated with the site '{0}' due to {1}".format(site_id, str(e)), "ERROR")
return {}

if not mgmt_ip_to_instance_id_map:
self.log("No reachable devices found at Site: {0}".format(site_id), "INFO")

return mgmt_ip_to_instance_id_map

def get_device_ids_from_site(self, site_id):
"""
Retrieve device IDs associated with a specific site in Cisco Catalyst Center.

Args:
site_id (str): The unique identifier of the site.

Returns:
list: A list of device IDs associated with the site.
Returns an empty list if no devices are found or if an error occurs.
"""

device_ids = []

if self.dnac_version <= self.version_2_3_5_3:
try:
response = self.dnac._exec(
family="sites",
function="get_membership",
op_modifies=True,
params={"site_id": site_id},
)

if response and "device" in response:
for device in response.get("device", []):
for item in device.get("response", []):
device_ids.append(item.get("instanceUuid"))

self.log("Retrieved device IDs from membership for site '{0}': {1}".format(site_id, device_ids), "DEBUG")

except Exception as e:
self.log("Error retrieving device IDs from membership for site '{0}': {1}".format(site_id, str(e)), "ERROR")

else:
try:
response = self.dnac._exec(
family="site_design",
function="get_site_assigned_network_devices",
op_modifies=True,
params={"site_id": site_id},
)

if response and "response" in response:
for device in response.get("response", []):
device_ids.append(device.get("deviceId"))

self.log("Retrieved device IDs from assigned devices for site '{0}': {1}".format(site_id, device_ids), "DEBUG")

except Exception as e:
self.log("Error retrieving device IDs from assigned devices for site '{0}': {1}".format(site_id, str(e)), "ERROR")

if not device_ids:
self.log("No devices found for site '{0}'".format(site_id), "INFO")

return device_ids

def get_site_id(self, site_name):
"""
Expand Down Expand Up @@ -1001,7 +1103,7 @@ def get_task_details_by_id(self, task_id):
Call the API 'get_task_details_by_id' to get the details along with the
failure reason. Return the details.
"""

# Need to handle exception
task_details = None
response = self.dnac._exec(
family="task",
Expand Down Expand Up @@ -1030,7 +1132,7 @@ def get_tasks_by_id(self, task_id):
Call the API 'get_tasks_by_id' to get the status of the task.
Return the details along with the status of the task.
"""

# Need to handle exception
task_status = None
response = self.dnac._exec(
family="task",
Expand All @@ -1051,7 +1153,7 @@ def get_tasks_by_id(self, task_id):

def check_tasks_response_status(self, response, api_name):
"""
Get the site id from the site name.
Get the task response status from taskId

Parameters:
self: The current object details.
Expand Down Expand Up @@ -1115,6 +1217,184 @@ def check_tasks_response_status(self, response, api_name):

return self

def set_operation_result(self, operation_status, is_changed, status_message, log_level, additional_info=None):
"""
Update the result of the operation with the provided status, message, and log level.
Parameters:
- operation_status (str): The status of the operation ("success" or "failed").
- is_changed (bool): Indicates whether the operation caused changes.
- status_message (str): The message describing the result of the operation.
- log_level (str): The log level at which the message should be logged ("INFO", "ERROR", "CRITICAL", etc.).
- additional_info (dict, optional): Additional data related to the operation result.
Returns:
self (object): An instance of the class.
Note:
- If the status is "failed", the "failed" key in the result dictionary will be set to True.
- If data is provided, it will be included in the result dictionary.
"""
# Update the result attributes with the provided values
self.status = operation_status
self.result.update({
"status": operation_status,
"msg": status_message,
"response": status_message,
"changed": is_changed,
"failed": operation_status == "failed",
"data": additional_info or {} # Include additional_info if provided, else an empty dictionary
})

# Log the message at the specified log level
self.log(status_message, log_level)

return self

def fail_and_exit(self, msg):
"""Helper method to update the result as failed and exit."""
self.set_operation_result("failed", False, msg, "ERROR")
self.check_return_status()

def log_traceback(self):
"""
Logs the full traceback of the current exception.
"""
# Capture the full traceback
full_traceback = traceback.format_exc()

# Log the traceback
self.log("Traceback: {0}".format(full_traceback), "DEBUG")

def check_timeout_and_exit(self, loop_start_time, task_id, task_name):
"""
Check if the elapsed time exceeds the specified timeout period and exit the while loop if it does.
Parameters:
- loop_start_time (float): The time when the while loop started.
- task_id (str): ID of the task being monitored.
- task_name (str): Name of the task being monitored.
Returns:
bool: True if the elapsed time exceeds the timeout period, False otherwise.
"""
# If the elapsed time exceeds the timeout period
elapsed_time = time.time() - loop_start_time
if elapsed_time > self.params.get("dnac_api_task_timeout"):
self.msg = "Task {0} with task id {1} has not completed within the timeout period of {2} seconds.".format(
task_name, task_id, int(elapsed_time))

# Update the result with failure status and log the error message
self.set_operation_result("failed", False, self.msg, "ERROR")
return True

return False

def get_taskid_post_api_call(self, api_family, api_function, api_parameters):
"""
Executes the specified API call with given parameters and logs responses.

Parameters:
api_family (str): The API family (e.g., "sda").
api_function (str): The API function (e.g., "add_port_assignments").
api_parameters (dict): The parameters for the API call.
"""
try:
self.log("Entered {0} method".format(api_function), "DEBUG")

# Execute the API call
response = self.dnac._exec(
family=api_family,
function=api_function,
op_modifies=True,
params=api_parameters,
)

self.log(
"Response received from API call to Function: '{0}' from Family: '{1}' is Response: {2}".format(
api_function, api_family, str(response)
),
"DEBUG"
)

# Process the response if available
response_data = response.get("response")
if not response_data:
self.log(
"No response received from API call to Function: '{0}' from Family: '{1}'.".format(
api_function, api_family
), "WARNING"
)
return None

# Update result and extract task ID
self.result.update(dict(response=response_data))
task_id = response_data.get("taskId")
self.log(
"Task ID received from API call to Function: '{0}' from Family: '{1}', Task ID: {2}".format(
api_function, api_family, task_id
), "INFO"
)
return task_id

except Exception as e:
# Log an error message and fail if an exception occurs
self.log_traceback()
self.msg = (
"An error occurred while executing API call to Function: '{0}' from Family: '{1}'. "
"Parameters: {2}. Exception: {3}.".format(api_function, api_family, api_parameters, str(e))
)
self.fail_and_exit(self.msg)

def get_task_status_from_tasks_by_id(self, task_id, task_name, task_params, success_msg):
"""
Retrieves and monitors the status of a task by its task ID.

This function continuously checks the status of a specified task using its task ID.
If the task completes successfully, it updates the message and status accordingly.
If the task fails or times out, it handles the error and updates the status and message.

Parameters:
- task_id (str): The unique identifier of the task to monitor.
- task_name (str): The name of the task being monitored.
- task_params (dict): Additional parameters related to the task.
- success_msg (str): The success message to set if the task completes successfully.

Returns:
- self: The instance of the class with updated status and message.
"""
loop_start_time = time.time()

while True:
response = self.get_tasks_by_id(task_id)

# Check if response is returned
if not response:
self.msg = "Error retrieving task status for '{0}' with task_id '{1}'".format(task_name, task_id)
self.set_operation_result("failed", False, self.msg, "ERROR")
break

status = response.get("status")
end_time = response.get("endTime")

# Check if the elapsed time exceeds the timeout
if self.check_timeout_and_exit(loop_start_time, task_id, task_name):
break

# Check if the task has completed (either success or failure)
if end_time:
if status == "FAILURE":
get_task_details_response = self.get_task_details_by_id(task_id)
failure_reason = get_task_details_response.get("failureReason", "Unknown reason")
self.msg = (
"Task {0} failed with Task ID: {1} for parameters: {2}. "
"Failure reason: {3}".format(task_name, task_id, task_params, failure_reason)
)
self.set_operation_result("failed", False, self.msg, "ERROR")
break
elif status == "SUCCESS":
self.msg = success_msg
self.set_operation_result("success", True, self.msg, "INFO")
break

time.sleep(self.params.get("dnac_task_poll_interval"))
return self


def is_list_complex(x):
return isinstance(x[0], dict) or isinstance(x[0], list)
Expand Down Expand Up @@ -1227,7 +1507,7 @@ def validate_str(item, param_spec, param_name, invalid_params):
Example `param_spec`:
{
"type": "str",
"length_max": 255 # Optional: maximum allowed length
"length_max": 255 # Optional: maximum allowed length
}
"""

Expand Down
Loading