From 0ef2625d5f24090331ba088e61dc9b2ffd3cb601 Mon Sep 17 00:00:00 2001 From: Abhishek-121 Date: Thu, 30 May 2024 23:52:55 +0530 Subject: [PATCH 1/7] Handle the invalid input handle for which predefined options are present for SNMP, Syslog and Email destination and also fix the enhanced validation issue of server_address --- plugins/module_utils/dnac.py | 11 +-- ...ents_and_notifications_workflow_manager.py | 80 +++++++++++++++---- 2 files changed, 70 insertions(+), 21 deletions(-) diff --git a/plugins/module_utils/dnac.py b/plugins/module_utils/dnac.py index 4fa8ce086b..b378420177 100644 --- a/plugins/module_utils/dnac.py +++ b/plugins/module_utils/dnac.py @@ -572,11 +572,12 @@ def is_valid_server_address(self, server_address): # Define the regex for a valid hostname hostname_regex = re.compile( - r'^(?!-)' # Hostname must not start with a hyphen - r'[A-Za-z0-9-]{1,63}' # Hostname segment must be 1-63 characters long - r'(?!-)$' # Hostname segment must not end with a hyphen - r'(\.[A-Za-z0-9-]{1,63})*' # Each segment can be 1-63 characters long - r'(\.[A-Za-z]{2,6})$' # Top-level domain must be 2-6 alphabetic characters + r'^(' # Start of the string + r'([A-Za-z0-9]+([A-Za-z0-9-]*[A-Za-z0-9])?\.)+[A-Za-z]{2,6}|' # Domain name (e.g., example.com) + r'localhost|' # Localhost + r'(\d{1,3}\.)+\d{1,3}|' # Custom IPv4-like format (e.g., 2.2.3.31.3.4.4) + r'[A-Fa-f0-9:]+$' # IPv6 address (e.g., 2f8:192:3::40:41:41:42) + r')$' # End of the string ) # Check if the address is a valid hostname diff --git a/plugins/modules/events_and_notifications_workflow_manager.py b/plugins/modules/events_and_notifications_workflow_manager.py index 5eabd457c0..1acbc5fc7e 100644 --- a/plugins/modules/events_and_notifications_workflow_manager.py +++ b/plugins/modules/events_and_notifications_workflow_manager.py @@ -103,7 +103,8 @@ type: bool email_destination: description: Configure settings to send out emails from Cisco Catalyst Center. Also we can create or configure email destination in Cisco Catalyst - Center only once then later we can just modify it. + Center only once then later we can just modify it. This one is just used to configure the Primary and Secondary SMTP server while configuring + the email destination. It's not related to email event subscription notification. type: dict suboptions: primary_smtp_config: @@ -1048,6 +1049,13 @@ def collect_snmp_playbook_params(self, snmp_details): 'snmpVersion': snmp_details.get('snmp_version') } server_address = snmp_details.get('server_address') + snmp_version = playbook_params.get("snmpVersion") + + if snmp_version and snmp_version not in ["V2C", "V3"]: + self.status = "failed" + self.msg = "Invalid SNMP version '{0}' given in the playbook for configuring SNMP destination".format(snmp_version) + self.log(self.msg, "ERROR") + self.check_return_status() if server_address and not self.is_valid_server_address(server_address): self.status = "failed" @@ -1055,19 +1063,35 @@ def collect_snmp_playbook_params(self, snmp_details): self.log(self.msg, "ERROR") self.check_return_status() - if playbook_params.get('snmpVersion') == "V2C": + if snmp_version == "V2C": playbook_params['community'] = snmp_details.get('community') - elif playbook_params.get('snmpVersion') == "V3": + elif snmp_version == "V3": playbook_params['userName'] = snmp_details.get('username') playbook_params['snmpMode'] = snmp_details.get('mode') + mode = playbook_params['snmpMode'] + auth_type = snmp_details.get('auth_type') - if playbook_params['snmpMode'] == "AUTH_PRIVACY": - playbook_params['snmpAuthType'] = snmp_details.get('auth_type') + if not mode or (mode not in ["AUTH_PRIVACY", "AUTH_NO_PRIVACY", "NO_AUTH_NO_PRIVACY"]): + self.status = "failed" + self.msg = """Invalid SNMP Mode '{0}' given in the playbook for configuring SNMP destination. Please select one of + the mode - AUTH_PRIVACY, AUTH_NO_PRIVACY, NO_AUTH_NO_PRIVACY in the playbook""".format(mode) + self.log(self.msg, "ERROR") + self.check_return_status() + + if auth_type and auth_type not in ["SHA", "MD5"]: + self.status = "failed" + self.msg = """Invalid SNMP Authentication Type '{0}' given in the playbook for configuring SNMP destination. Please + select either SHA or MD5 as authentication type in the playbook""".format(auth_type) + self.log(self.msg, "ERROR") + self.check_return_status() + + if playbook_params.get("snmpMode") == "AUTH_PRIVACY": + playbook_params['snmpAuthType'] = auth_type playbook_params['authPassword'] = snmp_details.get('auth_password') playbook_params['snmpPrivacyType'] = snmp_details.get('privacy_type', 'AES128') playbook_params['privacyPassword'] = snmp_details.get('privacy_password') - elif playbook_params['snmpMode'] == "AUTH_NO_PRIVACY": - playbook_params['snmpAuthType'] = snmp_details.get('auth_type') + elif playbook_params.get("snmpMode") == "AUTH_NO_PRIVACY": + playbook_params['snmpAuthType'] = auth_type playbook_params['authPassword'] = snmp_details.get('auth_password') return playbook_params @@ -1570,11 +1594,19 @@ def collect_email_playbook_params(self, email_details): if email_details.get('primary_smtp_config'): primary_smtp_details = email_details.get('primary_smtp_config') + primary_smtp_type = primary_smtp_details.get('smtp_type', "DEFAULT") + if primary_smtp_type not in ["DEFAULT", "TLS", "SSL"]: + self.status = "failed" + self.msg = """Invalid Primary SMTP Type '{0}' given in the playbook for configuring primary smtp server. + Please select one of the type - DEFAULT, TLS, SSL in the playbook""".format(primary_smtp_type) + self.log(self.msg, "ERROR") + self.check_return_status() + playbook_params['primarySMTPConfig'] = {} playbook_params['primarySMTPConfig']['hostName'] = primary_smtp_details.get('server_address') - playbook_params['primarySMTPConfig']['smtpType'] = primary_smtp_details.get('smtp_type', "DEFAULT") + playbook_params['primarySMTPConfig']['smtpType'] = primary_smtp_type - if playbook_params['primarySMTPConfig']['smtpType'] == 'DEFAULT': + if primary_smtp_type == 'DEFAULT': playbook_params['primarySMTPConfig']['port'] = "25" else: playbook_params['primarySMTPConfig']['port'] = primary_smtp_details.get('port') @@ -1583,9 +1615,18 @@ def collect_email_playbook_params(self, email_details): if email_details.get('secondary_smtp_config'): secondary_smtp_details = email_details.get('secondary_smtp_config') + secondary_smtp_type = secondary_smtp_details.get('smtp_type', "DEFAULT") + + if secondary_smtp_type and secondary_smtp_type not in ["DEFAULT", "TLS", "SSL"]: + self.status = "failed" + self.msg = """Invalid Secondary SMTP Type '{0}' given in the playbook for configuring secondary smtp server. + Please select one of the type - DEFAULT, TLS, SSL in the playbook""".format(secondary_smtp_type) + self.log(self.msg, "ERROR") + self.check_return_status() + playbook_params['secondarySMTPConfig'] = {} playbook_params['secondarySMTPConfig']['hostName'] = secondary_smtp_details.get('server_address') - playbook_params['secondarySMTPConfig']['smtpType'] = secondary_smtp_details.get('smtp_type', "DEFAULT") + playbook_params['secondarySMTPConfig']['smtpType'] = secondary_smtp_type if playbook_params['secondarySMTPConfig']['smtpType'] == 'DEFAULT': playbook_params['secondarySMTPConfig']['port'] = "25" @@ -2167,18 +2208,20 @@ def get_diff_merged(self, config): return self regex_pattern = re.compile( - r'^https://' # Ensure the URL starts with "https://" like https://webhook.cisco.com - r'(([A-Za-z0-9-]+\.)+[A-Za-z]{2,6}|' # Domain name (e.g., example.com, webhook.cisco.com) + r'^https://' # Ensure the URL starts with "https://" + r'(' + r'(([A-Za-z0-9-]+\.)+[A-Za-z]{2,6}|' # Domain name (e.g., example.com) r'localhost|' # Localhost r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # IPv4 address (e.g., 192.168.0.1) - r'\[?[A-Fa-f0-9:]+\]?)' # IPv6 address (e.g., [2001:db8::1]) + r'\[[A-Fa-f0-9:]+\]' # IPv6 address (e.g., [2001:db8::1]) + r')' r'(:\d+)?' # Optional port (e.g., :8080) - r'(\/[A-Za-z0-9._~:/?#[@!$&\'()*+,;=-]*)?$' # Path and query (optional) + r'(\/[A-Za-z0-9._~:/?#[@!$&\'()*+,;=-]*)?$)' # Path and query (optional) ) url = webhook_params.get('url') # Check if the input string matches the pattern - if url and not re.match(regex_pattern, url): + if url and not regex_pattern.match(url): self.status = "failed" self.msg = """Given url '{0}' is invalid url for Creating/Updating Webhook destination. It must starts with 'https://' and follow the valid https url format.""".format(url) @@ -2286,7 +2329,7 @@ def get_diff_merged(self, config): if server_address and not self.is_valid_server_address(server_address): self.status = "failed" - self.msg = "Invalid server address '{0}' given in the playbook for configuring syslog destination".format(server_address) + self.msg = "Invalid server address '{0}' given in the playbook for configuring Syslog destination".format(server_address) self.log(self.msg, "ERROR") return self @@ -2477,25 +2520,30 @@ def get_diff_deleted(self, config): self.msg = "Deleting the Webhook destination is not supported in Cisco Catalyst Center because of API limitations" self.log(self.msg, "ERROR") self.result['changed'] = False + return self if config.get('email_destination'): self.status = "failed" self.msg = "Deleting the Email destination is not supported in Cisco Catalyst Center because of API limitations" self.log(self.msg, "ERROR") self.result['changed'] = False + return self if config.get('syslog_destination'): self.status = "failed" self.msg = "Deleting the Syslog destination is not supported in Cisco Catalyst Center because of API limitations" self.log(self.msg, "ERROR") self.result['changed'] = False + return self if config.get('snmp_destination'): self.status = "failed" self.msg = "Deleting the SNMP destination is not supported in Cisco Catalyst Center because of API limitations" self.log(self.msg, "ERROR") self.result['changed'] = False + return self + # Delete ITSM Integration setting from Cisco Catalyst Center if config.get('itsm_setting'): itsm_details = self.want.get('itsm_details') itsm_name = itsm_details.get('instance_name') From fbaf823687446113ace5a4aefd2a9ab7016c9709 Mon Sep 17 00:00:00 2001 From: Abhishek-121 Date: Fri, 31 May 2024 15:09:18 +0530 Subject: [PATCH 2/7] update the regex pattern in webhook destination as it's not accepting ipv6 --- .../events_and_notifications_workflow_manager.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/plugins/modules/events_and_notifications_workflow_manager.py b/plugins/modules/events_and_notifications_workflow_manager.py index 1acbc5fc7e..964a5a3493 100644 --- a/plugins/modules/events_and_notifications_workflow_manager.py +++ b/plugins/modules/events_and_notifications_workflow_manager.py @@ -2210,13 +2210,16 @@ def get_diff_merged(self, config): regex_pattern = re.compile( r'^https://' # Ensure the URL starts with "https://" r'(' - r'(([A-Za-z0-9-]+\.)+[A-Za-z]{2,6}|' # Domain name (e.g., example.com) + r'(([A-Za-z0-9-*.&@]+\.)+[A-Za-z]{2,6})|' # Domain name with wildcards and special characters r'localhost|' # Localhost - r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # IPv4 address (e.g., 192.168.0.1) - r'\[[A-Fa-f0-9:]+\]' # IPv6 address (e.g., [2001:db8::1]) + r'(?:(?:\d{1,3}\.){3}\d{1,3}\b\.?)' # Partial or complete IPv4 address with optional trailing dot + r'(\[[A-Fa-f0-9:]+\])?' # Optional IPv6 address in square brackets (e.g., [2001:db8::1]) + r'|' # Alternation for different valid segments + r'([A-Za-z-_.&@]+)' # Hostname with allowed special characters r')' - r'(:\d+)?' # Optional port (e.g., :8080) - r'(\/[A-Za-z0-9._~:/?#[@!$&\'()*+,;=-]*)?$)' # Path and query (optional) + r'(:\d+)?' # Optional port + r'(\/[A-Za-z0-9._~:/?#[@!$&\'()*+,;=-]*)?' # Optional path + r'$' # End of the string ) url = webhook_params.get('url') From c515649c374ffa7b86262213854d8364c6a3ab79 Mon Sep 17 00:00:00 2001 From: MUTHU-RAKESH-27 <19cs127@psgitech.ac.in> Date: Sat, 1 Jun 2024 13:51:27 +0530 Subject: [PATCH 3/7] Removed the subscriber_name for the ISE server. Made required for minimal params for updation --- changelogs/changelog.yaml | 3 +- ...se_radius_integration_workflow_manager.yml | 1 - ...ise_radius_integration_workflow_manager.py | 313 ++++++++++-------- 3 files changed, 170 insertions(+), 147 deletions(-) diff --git a/changelogs/changelog.yaml b/changelogs/changelog.yaml index e033891697..6da0baa0e3 100644 --- a/changelogs/changelog.yaml +++ b/changelogs/changelog.yaml @@ -879,4 +879,5 @@ releases: - provision_workflow_manager - Updated changes related to handle errors. - site_workflow_manager - Updated changes in Site updation. - network_settings_workflow_manager - Added attributes 'ipv4_global_pool_name'. - - template_workflow_manager - Removed attributes 'create_time', 'failure_policy', 'last_update_time', 'latest_version_time', 'parent_template_id', 'project_id', 'validation_errors', 'rollback_template_params' and 'rollback_template_content'. \ No newline at end of file + - template_workflow_manager - Removed attributes 'create_time', 'failure_policy', 'last_update_time', 'latest_version_time', 'parent_template_id', 'project_id', 'validation_errors', 'rollback_template_params' and 'rollback_template_content'. + - ise_radius_integration_workflow_manager - Removed the attributes 'port' and 'subscriber_name'. \ No newline at end of file diff --git a/playbooks/ise_radius_integration_workflow_manager.yml b/playbooks/ise_radius_integration_workflow_manager.yml index 22a0e5ce28..fff66424d6 100644 --- a/playbooks/ise_radius_integration_workflow_manager.yml +++ b/playbooks/ise_radius_integration_workflow_manager.yml @@ -86,7 +86,6 @@ password: abcd fqdn: abc.cisco.com ip_address: 10.195.243.59 - subscriber_name: abcde description: CISCO ISE trusted_server: True diff --git a/plugins/modules/ise_radius_integration_workflow_manager.py b/plugins/modules/ise_radius_integration_workflow_manager.py index c1a537f792..66f1f0343d 100644 --- a/plugins/modules/ise_radius_integration_workflow_manager.py +++ b/plugins/modules/ise_radius_integration_workflow_manager.py @@ -175,11 +175,6 @@ - IP Address of the Cisco ISE Server. - Required for passing the cisco_ise_dtos. type: str - subscriber_name: - description: - - Subscriber name of the Cisco ISE server. - - Required for passing the cisco_ise_dtos. - type: str description: description: Description about the Cisco ISE server. type: str @@ -288,7 +283,6 @@ password: "12345" fqdn: abs.cisco.com ip_address: 10.0.0.2 - subscriber_name: px-1234 description: Cisco ISE trusted_server: True @@ -346,7 +340,6 @@ password: "12345" fqdn: abs.cisco.com ip_address: 10.0.0.2 - subscriber_name: px-1234 description: Cisco ISE - name: Delete an Authentication and Policy server. @@ -476,7 +469,6 @@ def validate_input(self): "password": {"type": 'string'}, "fqdn": {"type": 'string'}, "ip_address": {"type": 'string'}, - "subscriber_name": {"type": 'string'}, "description": {"type": 'string'}, "ssh_key": {"type": 'string'}, }, @@ -745,44 +737,51 @@ def get_want_authentication_policy_server(self, auth_policy_server): """ auth_server = {} + auth_server_exists = self.have.get("authenticationPolicyServer").get("exists") + auth_server_details = self.have.get("authenticationPolicyServer").get("details") trusted_server = False - server_type = auth_policy_server.get("server_type") - if server_type not in ["ISE", "AAA", None]: - self.msg = "server_type should either be ISE or AAA but not {0}.".format(server_type) - self.status = "failed" - return self + if not auth_server_exists: + server_type = auth_policy_server.get("server_type") + if server_type not in ["ISE", "AAA", None]: + self.msg = "server_type should either be ISE or AAA but not {0}.".format(server_type) + self.status = "failed" + return self - if server_type == "ISE": - auth_server.update({"isIseEnabled": True}) + if server_type == "ISE": + auth_server.update({"isIseEnabled": True}) + else: + auth_server.update({"isIseEnabled": False}) else: - auth_server.update({"isIseEnabled": False}) + auth_server.update({"isIseEnabled": auth_server_details.get("isIseEnabled")}) auth_server.update({"ipAddress": auth_policy_server.get("server_ip_address")}) auth_server_exists = self.have.get("authenticationPolicyServer").get("exists") - shared_secret = auth_policy_server.get("shared_secret") - if not (shared_secret or auth_server_exists): - self.msg = "Missing parameter 'shared_secret' is required." - self.status = "failed" - return self - shared_secret = str(shared_secret) - if not (4 <= len(shared_secret) <= 100): - self.msg = "The 'shared_secret' should contain between 4 and 100 characters." - self.status = "failed" - return self + if not auth_server_exists: + shared_secret = auth_policy_server.get("shared_secret") + if not (shared_secret or auth_server_exists): + self.msg = "Missing parameter 'shared_secret' is required." + self.status = "failed" + return self - if " " in shared_secret: - self.msg = "The 'shared_secret' should not contain any spaces." - self.status = "failed" - return self + shared_secret = str(shared_secret) + if not (4 <= len(shared_secret) <= 100): + self.msg = "The 'shared_secret' should contain between 4 and 100 characters." + self.status = "failed" + return self - if "?" in shared_secret or "<" in shared_secret: - self.msg = "The 'shared_secret' should not contain '?' or '<' characters." - self.status = "failed" - return self + if " " in shared_secret: + self.msg = "The 'shared_secret' should not contain any spaces." + self.status = "failed" + return self + + if "?" in shared_secret or "<" in shared_secret: + self.msg = "The 'shared_secret' should not contain '?' or '<' characters." + self.status = "failed" + return self - auth_server.update({"sharedSecret": shared_secret}) + auth_server.update({"sharedSecret": shared_secret}) protocol = auth_policy_server.get("protocol") if protocol not in ["RADIUS", "TACACS", "RADIUS_TACACS", None]: @@ -794,122 +793,141 @@ def get_want_authentication_policy_server(self, auth_policy_server): if protocol is not None: auth_server.update({"protocol": protocol}) else: - auth_server.update({"protocol": "RADIUS"}) + if not auth_server_exists: + auth_server.update({"protocol": "RADIUS"}) + else: + auth_server.update({"protocol": auth_server_details.get("protocol")}) auth_server.update({"port": 49}) - encryption_scheme = auth_policy_server.get("encryption_scheme") - if encryption_scheme not in ["KEYWRAP", "RADSEC", None]: - self.msg = "encryption_scheme should be in ['KEYWRAP', 'RADSEC']. " + \ - "It should not be {0}.".format(encryption_scheme) - self.status = "failed" - return self - - if encryption_scheme: - auth_server.update({"encryptionScheme": encryption_scheme}) - - if encryption_scheme == "KEYWRAP": - message_key = auth_policy_server.get("message_authenticator_code_key") - if not message_key: - self.msg = "The 'message_authenticator_code_key' should not be empty if the encryption_scheme is 'KEYWRAP'." + if not auth_server_exists: + encryption_scheme = auth_policy_server.get("encryption_scheme") + if encryption_scheme not in ["KEYWRAP", "RADSEC", None]: + self.msg = "encryption_scheme should be in ['KEYWRAP', 'RADSEC']. " + \ + "It should not be {0}.".format(encryption_scheme) self.status = "failed" return self - message_key = str(message_key) - message_key_length = len(message_key) - if message_key_length != 20: - self.msg = "The 'message_authenticator_code_key' should be exactly 20 characters." - self.status = "failed" - return self + if encryption_scheme: + auth_server.update({"encryptionScheme": encryption_scheme}) - auth_server.update({"messageKey": message_key}) + if encryption_scheme == "KEYWRAP": + message_key = auth_policy_server.get("message_authenticator_code_key") + if not message_key: + self.msg = "The 'message_authenticator_code_key' should not be empty if the encryption_scheme is 'KEYWRAP'." + self.status = "failed" + return self - encryption_key = auth_policy_server.get("encryption_key") - if not encryption_key: - self.msg = "encryption_key should not be empty if encryption_scheme is 'KEYWRAP'." - self.status = "failed" - return self + message_key = str(message_key) + message_key_length = len(message_key) + if message_key_length != 20: + self.msg = "The 'message_authenticator_code_key' should be exactly 20 characters." + self.status = "failed" + return self - encryption_key = str(encryption_key) - encryption_key_length = len(encryption_key) - if encryption_key_length != 16: - self.msg = "The 'encryption_key' must be 16 characters long. It may contain alphanumeric and special characters." - self.status = "failed" - return self + auth_server.update({"messageKey": message_key}) - auth_server.update({"encryptionKey": encryption_key}) + encryption_key = auth_policy_server.get("encryption_key") + if not encryption_key: + self.msg = "encryption_key should not be empty if encryption_scheme is 'KEYWRAP'." + self.status = "failed" + return self - authentication_port = auth_policy_server.get("authentication_port") - if not authentication_port: - authentication_port = 1812 + encryption_key = str(encryption_key) + encryption_key_length = len(encryption_key) + if encryption_key_length != 16: + self.msg = "The 'encryption_key' must be 16 characters long. It may contain alphanumeric and special characters." + self.status = "failed" + return self - if not str(authentication_port).isdigit(): - self.msg = "The 'authentication_port' should contain only digits." - self.status = "failed" - return self + auth_server.update({"encryptionKey": encryption_key}) - if not 1 <= authentication_port <= 65535: - self.msg = "The 'authentication_port' should be from 1 to 65535." - self.status = "failed" - return self + if not auth_server_exists: + authentication_port = auth_policy_server.get("authentication_port") + if not authentication_port: + authentication_port = 1812 - auth_server.update({"authenticationPort": authentication_port}) + if not str(authentication_port).isdigit(): + self.msg = "The 'authentication_port' should contain only digits." + self.status = "failed" + return self - accounting_port = auth_policy_server.get("accounting_port") - if not accounting_port: - accounting_port = 1813 + if not 1 <= authentication_port <= 65535: + self.msg = "The 'authentication_port' should be from 1 to 65535." + self.status = "failed" + return self - if not str(accounting_port).isdigit(): - self.msg = "The 'accounting_port' should contain only digits." - self.status = "failed" - return self + auth_server.update({"authenticationPort": authentication_port}) + else: + auth_server.update({"authenticationPort": auth_server_details.get("authenticationPort")}) - if not 1 <= accounting_port <= 65535: - self.msg = "The 'accounting_port' should be from 1 to 65535." - self.status = "failed" - return self + if not auth_server_exists: + accounting_port = auth_policy_server.get("accounting_port") + if not accounting_port: + accounting_port = 1813 - auth_server.update({"accountingPort": accounting_port}) + if not str(accounting_port).isdigit(): + self.msg = "The 'accounting_port' should contain only digits." + self.status = "failed" + return self + + if not 1 <= accounting_port <= 65535: + self.msg = "The 'accounting_port' should be from 1 to 65535." + self.status = "failed" + return self + + auth_server.update({"accountingPort": accounting_port}) + else: + auth_server.update({"accountingPort": auth_server_details.get("accountingPort")}) retries = auth_policy_server.get("retries") if not retries: - retries = "3" - - retries = str(retries) - if not retries.isdigit(): - self.msg = "The 'retries' should contain only from 0-9." - self.status = "failed" - return self + if not auth_server_exists: + auth_server.update({"retries": "3"}) + else: + auth_server.update({"retries": auth_server_details.get("retries")}) + else: + retries = str(retries) + if not retries.isdigit(): + self.msg = "The 'retries' should contain only from 0-9." + self.status = "failed" + return self - if not 1 <= int(retries) <= 3: - self.msg = "The 'retries' should be from 1 to 3." - self.status = "failed" - return self + if not 1 <= int(retries) <= 3: + self.msg = "The 'retries' should be from 1 to 3." + self.status = "failed" + return self - auth_server.update({"retries": retries}) + auth_server.update({"retries": retries}) timeout = auth_policy_server.get("timeout") if not timeout: - timeout = "4" - - timeout = str(timeout) - if not timeout.isdigit(): - self.msg = "The 'timeout' should contain only from 0-9." - self.status = "failed" - return self + if not auth_server_exists: + auth_server.update({"timeoutSeconds": "4"}) + else: + auth_server.update({"timeoutSeconds": auth_server_details.get("timeoutSeconds")}) + else: + timeout = str(timeout) + if not timeout.isdigit(): + self.msg = "The 'timeout' should contain only from 0-9." + self.status = "failed" + return self - if not 2 <= int(timeout) <= 20: - self.msg = "The 'timeout' should be from 2 to 20." - self.status = "failed" - return self + if not 2 <= int(timeout) <= 20: + self.msg = "The 'timeout' should be from 2 to 20." + self.status = "failed" + return self - auth_server.update({"timeoutSeconds": timeout}) + auth_server.update({"timeoutSeconds": timeout}) - role = auth_policy_server.get("role") - if role: - auth_server.update({"role": role}) + if not auth_server_exists: + role = auth_policy_server.get("role") + if role: + auth_server.update({"role": role}) + else: + auth_server.update({"role": "secondary"}) else: - auth_server.update({"role": "secondary"}) + auth_server.update({"role": auth_server_details.get("role")}) if auth_server.get("isIseEnabled"): cisco_ise_dtos = auth_policy_server.get("cisco_ise_dtos") @@ -966,15 +984,14 @@ def get_want_authentication_policy_server(self, auth_policy_server): "ipAddress": ip_address }) - subscriber_name = ise_credential.get("subscriber_name") - if not subscriber_name: - self.msg = "Missing parameter 'subscriber_name' is required when server_type is ISE." - self.status = "failed" - return self - - auth_server.get("ciscoIseDtos")[position_ise_creds].update({ - "subscriberName": subscriber_name - }) + if not auth_server_exists: + auth_server.get("ciscoIseDtos")[position_ise_creds].update({ + "subscriberName": "ersadmin" + }) + else: + auth_server.get("ciscoIseDtos")[position_ise_creds].update({ + "subscriberName": auth_server_details.get("ciscoIseDtos")[0].get("subscriberName") + }) description = ise_credential.get("description") if description: @@ -991,16 +1008,22 @@ def get_want_authentication_policy_server(self, auth_policy_server): position_ise_creds += 1 pxgrid_enabled = auth_policy_server.get("pxgrid_enabled") - if pxgrid_enabled is not None: - auth_server.update({"pxgridEnabled": pxgrid_enabled}) + if not pxgrid_enabled: + if not auth_server_exists: + auth_server.update({"pxgridEnabled": True}) + else: + auth_server.update({"pxgridEnabled": auth_server_details.get("pxgridEnabled")}) else: - auth_server.update({"pxgridEnabled": True}) + auth_server.update({"pxgridEnabled": pxgrid_enabled}) use_dnac_cert_for_pxgrid = auth_policy_server.get("use_dnac_cert_for_pxgrid") - if use_dnac_cert_for_pxgrid: - auth_server.update({"useDnacCertForPxgrid": use_dnac_cert_for_pxgrid}) + if not use_dnac_cert_for_pxgrid: + if not auth_server_exists: + auth_server.update({"useDnacCertForPxgrid": False}) + else: + auth_server.update({"useDnacCertForPxgrid": auth_server_details.get("useDnacCertForPxgrid")}) else: - auth_server.update({"useDnacCertForPxgrid": False}) + auth_server.update({"useDnacCertForPxgrid": use_dnac_cert_for_pxgrid}) external_cisco_ise_ip_addr_dtos = auth_policy_server \ .get("external_cisco_ise_ip_addr_dtos") @@ -1119,14 +1142,14 @@ def format_payload_for_update(self, have_auth_server, want_auth_server): self - The current object with updated desired Authentication Policy Server information. """ - if want_auth_server.get("sharedSecret") is not None: - del want_auth_server["sharedSecret"] - if want_auth_server.get("encryptionScheme") is not None: - del want_auth_server["encryptionScheme"] - if want_auth_server.get("messageKey") is not None: - del want_auth_server["messageKey"] - if want_auth_server.get("encryptionKey") is not None: - del want_auth_server["encryptionKey"] + # if want_auth_server.get("sharedSecret") is not None: + # del want_auth_server["sharedSecret"] + # if want_auth_server.get("encryptionScheme") is not None: + # del want_auth_server["encryptionScheme"] + # if want_auth_server.get("messageKey") is not None: + # del want_auth_server["messageKey"] + # if want_auth_server.get("encryptionKey") is not None: + # del want_auth_server["encryptionKey"] update_params = ["authenticationPort", "accountingPort", "role"] for item in update_params: @@ -1190,7 +1213,7 @@ def update_auth_policy_server(self, ipAddress): if is_ise_server: trusted_server = self.want.get("trusted_server") self.accept_cisco_ise_server_certificate(ipAddress, trusted_server) - time.sleep(15) + time.sleep(20) response = self.dnac._exec( family="system_settings", function='get_authentication_and_policy_servers', From 69467dab21cad1ec32da18c7a5d3c6798bcffbc5 Mon Sep 17 00:00:00 2001 From: MUTHU-RAKESH-27 <19cs127@psgitech.ac.in> Date: Mon, 3 Jun 2024 11:49:50 +0530 Subject: [PATCH 4/7] Addressed the review comments --- changelogs/changelog.yaml | 2 +- ...se_radius_integration_workflow_manager.yml | 1 + ...ise_radius_integration_workflow_manager.py | 155 ++++++++++-------- 3 files changed, 92 insertions(+), 66 deletions(-) diff --git a/changelogs/changelog.yaml b/changelogs/changelog.yaml index 6da0baa0e3..7d4450dfcc 100644 --- a/changelogs/changelog.yaml +++ b/changelogs/changelog.yaml @@ -880,4 +880,4 @@ releases: - site_workflow_manager - Updated changes in Site updation. - network_settings_workflow_manager - Added attributes 'ipv4_global_pool_name'. - template_workflow_manager - Removed attributes 'create_time', 'failure_policy', 'last_update_time', 'latest_version_time', 'parent_template_id', 'project_id', 'validation_errors', 'rollback_template_params' and 'rollback_template_content'. - - ise_radius_integration_workflow_manager - Removed the attributes 'port' and 'subscriber_name'. \ No newline at end of file + - ise_radius_integration_workflow_manager - Removed the attributes 'port' and 'subscriber_name'. Added the attribute 'ise_integration_wait_time'. \ No newline at end of file diff --git a/playbooks/ise_radius_integration_workflow_manager.yml b/playbooks/ise_radius_integration_workflow_manager.yml index fff66424d6..9ca9a9181a 100644 --- a/playbooks/ise_radius_integration_workflow_manager.yml +++ b/playbooks/ise_radius_integration_workflow_manager.yml @@ -88,6 +88,7 @@ ip_address: 10.195.243.59 description: CISCO ISE trusted_server: True + ise_integration_wait_time: 20 - name: Delete an ISE Server. cisco.dnac.ise_radius_integration_workflow_manager: diff --git a/plugins/modules/ise_radius_integration_workflow_manager.py b/plugins/modules/ise_radius_integration_workflow_manager.py index 66f1f0343d..6168f1f91c 100644 --- a/plugins/modules/ise_radius_integration_workflow_manager.py +++ b/plugins/modules/ise_radius_integration_workflow_manager.py @@ -201,7 +201,14 @@ description: - Indicates whether the certificate is trustworthy for the server. - Serves as a validation of its authenticity and reliability in secure connections. + default: True type: bool + ise_integration_wait_time: + description: + - Indicates the sleep time after initiating the Cisco ISE integration process. + - Maximum sleep time should be less or equal to 60 seconds. + default: 20 + type: int requirements: - dnacentersdk >= 2.7.0 - python >= 3.9 @@ -285,6 +292,7 @@ ip_address: 10.0.0.2 description: Cisco ISE trusted_server: True + ise_integration_wait_time: 20 - name: Update an AAA server. cisco.dnac.ise_radius_integration_workflow_manager: @@ -479,7 +487,9 @@ def validate_input(self): "external_ip_address": {"type": 'string'}, }, "ise_type": {"type": 'string'}, - } + }, + "trusted_server": {"type": 'bool'}, + "ise_integration_wait_time": {"type": 'integer'} } } @@ -743,7 +753,7 @@ def get_want_authentication_policy_server(self, auth_policy_server): if not auth_server_exists: server_type = auth_policy_server.get("server_type") if server_type not in ["ISE", "AAA", None]: - self.msg = "server_type should either be ISE or AAA but not {0}.".format(server_type) + self.msg = "The server_type should either be ISE or AAA but not {0}.".format(server_type) self.status = "failed" return self @@ -760,7 +770,7 @@ def get_want_authentication_policy_server(self, auth_policy_server): if not auth_server_exists: shared_secret = auth_policy_server.get("shared_secret") - if not (shared_secret or auth_server_exists): + if not shared_secret: self.msg = "Missing parameter 'shared_secret' is required." self.status = "failed" return self @@ -771,15 +781,12 @@ def get_want_authentication_policy_server(self, auth_policy_server): self.status = "failed" return self - if " " in shared_secret: - self.msg = "The 'shared_secret' should not contain any spaces." - self.status = "failed" - return self - - if "?" in shared_secret or "<" in shared_secret: - self.msg = "The 'shared_secret' should not contain '?' or '<' characters." - self.status = "failed" - return self + invalid_chars = " ?<" + for char in invalid_chars: + if char in shared_secret: + self.msg = "The 'shared_secret' should not contain spaces or the characters '?', '<'." + self.status = "failed" + return self auth_server.update({"sharedSecret": shared_secret}) @@ -803,7 +810,7 @@ def get_want_authentication_policy_server(self, auth_policy_server): if not auth_server_exists: encryption_scheme = auth_policy_server.get("encryption_scheme") if encryption_scheme not in ["KEYWRAP", "RADSEC", None]: - self.msg = "encryption_scheme should be in ['KEYWRAP', 'RADSEC']. " + \ + self.msg = "The encryption_scheme should be in ['KEYWRAP', 'RADSEC']. " + \ "It should not be {0}.".format(encryption_scheme) self.status = "failed" return self @@ -829,7 +836,7 @@ def get_want_authentication_policy_server(self, auth_policy_server): encryption_key = auth_policy_server.get("encryption_key") if not encryption_key: - self.msg = "encryption_key should not be empty if encryption_scheme is 'KEYWRAP'." + self.msg = "The encryption_key should not be empty if encryption_scheme is 'KEYWRAP'." self.status = "failed" return self @@ -887,47 +894,51 @@ def get_want_authentication_policy_server(self, auth_policy_server): else: auth_server.update({"retries": auth_server_details.get("retries")}) else: - retries = str(retries) - if not retries.isdigit(): + try: + retries = str(retries) + if not 1 <= int(retries) <= 3: + self.msg = "The 'retries' should be from 1 to 3." + self.status = "failed" + return self + except ValueError: self.msg = "The 'retries' should contain only from 0-9." self.status = "failed" return self - if not 1 <= int(retries) <= 3: - self.msg = "The 'retries' should be from 1 to 3." - self.status = "failed" - return self - auth_server.update({"retries": retries}) timeout = auth_policy_server.get("timeout") - if not timeout: - if not auth_server_exists: - auth_server.update({"timeoutSeconds": "4"}) - else: - auth_server.update({"timeoutSeconds": auth_server_details.get("timeoutSeconds")}) + if not auth_server_exists: + default_timeout = "4" else: - timeout = str(timeout) - if not timeout.isdigit(): - self.msg = "The 'timeout' should contain only from 0-9." - self.status = "failed" - return self + default_timeout = str(auth_server_details.get("timeoutSeconds")) - if not 2 <= int(timeout) <= 20: - self.msg = "The 'timeout' should be from 2 to 20." + # If 'timeout' is not provided, use 'default_timeout' + if timeout is None: + auth_server.update({"timeoutSeconds": default_timeout}) + else: + try: + timeout_int = int(timeout) + if timeout_int < 2 or timeout_int > 20: + self.msg = "The 'timeout' should be from 2 to 20." + self.status = "failed" + return self + + auth_server.update({"timeoutSeconds": str(timeout)}) + except ValueError: + self.msg = "The 'time_out' must contain only digits." self.status = "failed" return self - auth_server.update({"timeoutSeconds": timeout}) - + # Determine the role based on whether the auth server exists and if the role is specified if not auth_server_exists: - role = auth_policy_server.get("role") - if role: - auth_server.update({"role": role}) - else: - auth_server.update({"role": "secondary"}) + # Use the role from 'auth_policy_server' if available, otherwise default to "secondary" + role = auth_policy_server.get("role", "secondary") else: - auth_server.update({"role": auth_server_details.get("role")}) + # Use the role from 'auth_server_details' + role = auth_server_details.get("role") + + auth_server.update({"role": role}) if auth_server.get("isIseEnabled"): cisco_ise_dtos = auth_policy_server.get("cisco_ise_dtos") @@ -1008,22 +1019,22 @@ def get_want_authentication_policy_server(self, auth_policy_server): position_ise_creds += 1 pxgrid_enabled = auth_policy_server.get("pxgrid_enabled") - if not pxgrid_enabled: - if not auth_server_exists: - auth_server.update({"pxgridEnabled": True}) + if pxgrid_enabled is None: + if auth_server_exists: + pxgrid_enabled = auth_server_details.get("pxgridEnabled") else: - auth_server.update({"pxgridEnabled": auth_server_details.get("pxgridEnabled")}) - else: - auth_server.update({"pxgridEnabled": pxgrid_enabled}) + pxgrid_enabled = True + + auth_server.update({"pxgridEnabled": pxgrid_enabled}) use_dnac_cert_for_pxgrid = auth_policy_server.get("use_dnac_cert_for_pxgrid") - if not use_dnac_cert_for_pxgrid: - if not auth_server_exists: - auth_server.update({"useDnacCertForPxgrid": False}) + if use_dnac_cert_for_pxgrid is None: + if auth_server_exists: + use_dnac_cert_for_pxgrid = auth_server_details.get("useDnacCertForPxgrid") else: - auth_server.update({"useDnacCertForPxgrid": auth_server_details.get("useDnacCertForPxgrid")}) - else: - auth_server.update({"useDnacCertForPxgrid": use_dnac_cert_for_pxgrid}) + use_dnac_cert_for_pxgrid = False + + auth_server.update({"useDnacCertForPxgrid": use_dnac_cert_for_pxgrid}) external_cisco_ise_ip_addr_dtos = auth_policy_server \ .get("external_cisco_ise_ip_addr_dtos") @@ -1052,13 +1063,35 @@ def get_want_authentication_policy_server(self, auth_policy_server): .update({"type": ise_type}) position_ise_addresses += 1 - if auth_policy_server.get("trusted_server"): + trusted_server = auth_policy_server.get("trusted_server") + if auth_policy_server.get("trusted_server") is None: trusted_server = True + else: + trusted_server = auth_policy_server.get("trusted_server") + + self.want.update({"trusted_server": trusted_server}) + + ise_integration_wait_time = auth_policy_server.get("ise_integration_wait_time") + if ise_integration_wait_time is None: + ise_integration_wait_time = 20 + else: + try: + ise_integration_wait_time_int = int(ise_integration_wait_time) + if ise_integration_wait_time_int < 1 or ise_integration_wait_time_int > 60: + self.msg = "The ise_integration_wait_time should be from 1 to 60 seconds." + self.status = "failed" + return self + + except ValueError: + self.msg = "The 'ise_integration_wait_time' should contain only digits." + self.status = "failed" + return self + + self.want.update({"ise_integration_wait_time": ise_integration_wait_time}) self.log("Authentication and Policy Server playbook details: {0}" .format(auth_server), "DEBUG") self.want.update({"authenticationPolicyServer": auth_server}) - self.want.update({"trusted_server": trusted_server}) self.msg = "Collecting the Authentication and Policy Server details from the playbook" self.status = "success" return self @@ -1142,15 +1175,6 @@ def format_payload_for_update(self, have_auth_server, want_auth_server): self - The current object with updated desired Authentication Policy Server information. """ - # if want_auth_server.get("sharedSecret") is not None: - # del want_auth_server["sharedSecret"] - # if want_auth_server.get("encryptionScheme") is not None: - # del want_auth_server["encryptionScheme"] - # if want_auth_server.get("messageKey") is not None: - # del want_auth_server["messageKey"] - # if want_auth_server.get("encryptionKey") is not None: - # del want_auth_server["encryptionKey"] - update_params = ["authenticationPort", "accountingPort", "role"] for item in update_params: have_auth_server_item = have_auth_server.get(item) @@ -1213,7 +1237,8 @@ def update_auth_policy_server(self, ipAddress): if is_ise_server: trusted_server = self.want.get("trusted_server") self.accept_cisco_ise_server_certificate(ipAddress, trusted_server) - time.sleep(20) + ise_integration_wait_time = self.want.get("ise_integration_wait_time") + time.sleep(ise_integration_wait_time) response = self.dnac._exec( family="system_settings", function='get_authentication_and_policy_servers', From 1bc101aa5b7f037e0043cfb5ea6b89848afb90b9 Mon Sep 17 00:00:00 2001 From: MUTHU-RAKESH-27 <19cs127@psgitech.ac.in> Date: Mon, 3 Jun 2024 13:06:28 +0530 Subject: [PATCH 5/7] Addressed the review comments --- plugins/modules/ise_radius_integration_workflow_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/modules/ise_radius_integration_workflow_manager.py b/plugins/modules/ise_radius_integration_workflow_manager.py index 6168f1f91c..aad8187bb3 100644 --- a/plugins/modules/ise_radius_integration_workflow_manager.py +++ b/plugins/modules/ise_radius_integration_workflow_manager.py @@ -895,8 +895,8 @@ def get_want_authentication_policy_server(self, auth_policy_server): auth_server.update({"retries": auth_server_details.get("retries")}) else: try: - retries = str(retries) - if not 1 <= int(retries) <= 3: + retries_int = int(retries) + if not 1 <= retries_int <= 3: self.msg = "The 'retries' should be from 1 to 3." self.status = "failed" return self @@ -905,7 +905,7 @@ def get_want_authentication_policy_server(self, auth_policy_server): self.status = "failed" return self - auth_server.update({"retries": retries}) + auth_server.update({"retries": str(retries)}) timeout = auth_policy_server.get("timeout") if not auth_server_exists: From 591d88881eaa246e78df0654736be237e6dbda98 Mon Sep 17 00:00:00 2001 From: MUTHU-RAKESH-27 <19cs127@psgitech.ac.in> Date: Mon, 3 Jun 2024 13:25:12 +0530 Subject: [PATCH 6/7] Addressed the review comments --- .../modules/ise_radius_integration_workflow_manager.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/modules/ise_radius_integration_workflow_manager.py b/plugins/modules/ise_radius_integration_workflow_manager.py index aad8187bb3..acf475a0ff 100644 --- a/plugins/modules/ise_radius_integration_workflow_manager.py +++ b/plugins/modules/ise_radius_integration_workflow_manager.py @@ -776,7 +776,7 @@ def get_want_authentication_policy_server(self, auth_policy_server): return self shared_secret = str(shared_secret) - if not (4 <= len(shared_secret) <= 100): + if len(shared_secret) < 4 or len(shared_secret) > 100: self.msg = "The 'shared_secret' should contain between 4 and 100 characters." self.status = "failed" return self @@ -859,7 +859,7 @@ def get_want_authentication_policy_server(self, auth_policy_server): self.status = "failed" return self - if not 1 <= authentication_port <= 65535: + if authentication_port < 1 or authentication_port > 65535: self.msg = "The 'authentication_port' should be from 1 to 65535." self.status = "failed" return self @@ -878,7 +878,7 @@ def get_want_authentication_policy_server(self, auth_policy_server): self.status = "failed" return self - if not 1 <= accounting_port <= 65535: + if accounting_port < 1 or accounting_port > 65535: self.msg = "The 'accounting_port' should be from 1 to 65535." self.status = "failed" return self @@ -896,7 +896,7 @@ def get_want_authentication_policy_server(self, auth_policy_server): else: try: retries_int = int(retries) - if not 1 <= retries_int <= 3: + if retries_int < 1 or retries_int > 3: self.msg = "The 'retries' should be from 1 to 3." self.status = "failed" return self From d64cc272112e9a769b5616097aa37c46203292d9 Mon Sep 17 00:00:00 2001 From: MUTHU-RAKESH-27 <19cs127@psgitech.ac.in> Date: Tue, 4 Jun 2024 09:35:05 +0530 Subject: [PATCH 7/7] Removed the params which cannot be updated from the example playbook --- plugins/modules/ise_radius_integration_workflow_manager.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/plugins/modules/ise_radius_integration_workflow_manager.py b/plugins/modules/ise_radius_integration_workflow_manager.py index acf475a0ff..b8c576a6cd 100644 --- a/plugins/modules/ise_radius_integration_workflow_manager.py +++ b/plugins/modules/ise_radius_integration_workflow_manager.py @@ -312,11 +312,8 @@ server_type: AAA server_ip_address: 10.0.0.1 protocol: RADIUS_TACACS - authentication_port: 1812 - accounting_port: 1813 retries: 3 timeout: 5 - role: secondary - name: Update an Cisco ISE server. cisco.dnac.ise_radius_integration_workflow_manager: @@ -336,11 +333,8 @@ server_type: ISE server_ip_address: 10.0.0.2 protocol: RADIUS_TACACS - authentication_port: 1812 - accounting_port: 1813 retries: 3 timeout: 5 - role: primary use_dnac_cert_for_pxgrid: False pxgrid_enabled: True cisco_ise_dtos: