From fbda270d4c440451c8b2ee31e0443ea3b47e56f7 Mon Sep 17 00:00:00 2001 From: ClayJay3 Date: Tue, 13 Jun 2023 14:02:12 -0500 Subject: [PATCH 1/9] Added some vlan options. --- interface/main_window.py | 92 +++++++++++++++++++++++++--------------- utils/deploy_options.py | 17 ++++++-- utils/open_connection.py | 2 + 3 files changed, 74 insertions(+), 37 deletions(-) diff --git a/interface/main_window.py b/interface/main_window.py index a1f5925..e28dcf1 100644 --- a/interface/main_window.py +++ b/interface/main_window.py @@ -11,7 +11,7 @@ from tkinter import font from netmiko.exceptions import ReadTimeout from interface.popup_window import text_popup -from utils.deploy_options import change_access_port_vlans, setup_dhcp_snooping_on_trunks, setup_dynamic_arp_inspection_on_trunks +from utils.deploy_options import change_port_vlans, setup_dhcp_snooping_on_trunks, setup_dynamic_arp_inspection_on_trunks from pyvis.network import Network from utils.open_connection import ssh_autodetect_info, ssh_telnet @@ -80,6 +80,7 @@ def initialize_window(self) -> None: self.closest_hop_entry = None self.enable_telnet_check = None self.change_vlan_check = None + self.toggle_voice_vlan_check = None self.force_telnet_check = None self.vlan_old_entry = None self.vlan_new_entry = None @@ -104,6 +105,7 @@ def initialize_window(self) -> None: # Create checkbox variables. self.enable_telnet_check = tk.BooleanVar(self.window) self.change_vlan_check = tk.BooleanVar(self.window) + self.toggle_voice_vlan_check = tk.BooleanVar(self.window) self.turbo_deploy_check = tk.BooleanVar(self.window) self.force_telnet_check = tk.BooleanVar(self.window) self.dhcp_snooping_check = tk.BooleanVar(self.window) @@ -228,41 +230,63 @@ def initialize_window(self) -> None: new_secret_entry.grid(row=5, column=9, sticky=tk.NSEW) self.secret_entrys.append(new_secret_entry) + ######################################################################################################### # Populate option frame. - secret_label = tk.Label(master=options_frame, text="Options:", font=(self.font, 14)) - secret_label.grid(row=0, column=0, columnspan=10, sticky=tk.NSEW) - enable_vlan_change_checkbox = tk.Checkbutton(master=options_frame, variable=self.change_vlan_check, onvalue=True, offvalue=False) - enable_vlan_change_checkbox.grid(row=1, rowspan=1, column=0, columnspan=1, sticky=tk.E) - vlan_change_text = tk.Label(master=options_frame, text="Change VLAN", font=(self.font, 10)) - vlan_change_text.grid(row=1, column=1, columnspan=1, sticky=tk.W) - self.vlan_old_entry = tk.Entry(master=options_frame, width=10, validate="all", validatecommand=(options_frame.register(lambda input:True if str.isdigit(input) or input == "" else False), "%P")) - self.vlan_old_entry.grid(row=1, column=2, sticky=tk.W) - vlan_change_text = tk.Label(master=options_frame, text=" access ports to VLAN", font=(self.font, 10)) - vlan_change_text.grid(row=1, column=3, columnspan=1, sticky=tk.W) - self.vlan_new_entry = tk.Entry(master=options_frame, width=10, validate="all", validatecommand=(options_frame.register(lambda input:True if (str.isdigit(input) or input == "") else False), "%P")) - self.vlan_new_entry.grid(row=1, column=4, sticky=tk.W) - enable_turbo_deploy_checkbox = tk.Checkbutton(master=options_frame, variable=self.turbo_deploy_check, onvalue=True, offvalue=False) - enable_turbo_deploy_checkbox.grid(row=2, rowspan=1, column=0, columnspan=1, sticky=tk.E) - vlan_change_text = tk.Label(master=options_frame, text="Enable Turbo Deploy? (WARNING: Doesn't ask user for confirmation)", font=(self.font, 10)) - vlan_change_text.grid(row=2, column=1, columnspan=5, sticky=tk.W) - enable_dhcp_snooping_checkbox = tk.Checkbutton(master=options_frame, variable=self.dhcp_snooping_check, onvalue=True, offvalue=False) - enable_dhcp_snooping_checkbox.grid(row=3, rowspan=1, column=0, columnspan=1, sticky=tk.E) - dhcp_vlan_text = tk.Label(master=options_frame, text="Enable DHCP Snooping on vlans", font=(self.font, 10)) - dhcp_vlan_text.grid(row=3, column=1, columnspan=2, sticky=tk.W) - self.dhcp_snoop_vlan_entry = tk.Entry(master=options_frame, width=10, validate="all", validatecommand=(options_frame.register(lambda input:True if (re.match("^[0-9,-]*$", input) or input == "") else False), "%P")) - self.dhcp_snoop_vlan_entry.grid(row=3, column=3, sticky=tk.W) + ######################################################################################################### + options_label = tk.Label(master=options_frame, text="Options:", font=(self.font, 14)) + options_label.grid(row=0, column=0, columnspan=10, sticky=tk.NSEW) + ######################################################################################################### + vlan_options_frame = tk.Frame(master=options_frame, relief=tk.GROOVE, borderwidth=2) # Create frame from grouping vlan options. + vlan_options_frame.grid(row=1, rowspan=2, column=0, columnspan=5, sticky=tk.NSEW) + vlan_options_frame.rowconfigure(self.grid_size, weight=1) + vlan_options_frame.columnconfigure(self.grid_size, weight=1) + enable_vlan_change_checkbox = tk.Checkbutton(master=vlan_options_frame, variable=self.change_vlan_check, onvalue=True, offvalue=False) + enable_vlan_change_checkbox.grid(row=0, rowspan=1, column=0, columnspan=1, sticky=tk.E) + vlan_change_text = tk.Label(master=vlan_options_frame, text="Change VLAN", font=(self.font, 10)) + vlan_change_text.grid(row=0, column=1, columnspan=1, sticky=tk.W) + self.vlan_old_entry = tk.Entry(master=vlan_options_frame, width=10, validate="all", validatecommand=(vlan_options_frame.register(lambda input:True if str.isdigit(input) or input == "" else False), "%P")) + self.vlan_old_entry.grid(row=0, column=2, sticky=tk.W) + vlan_change_text = tk.Label(master=vlan_options_frame, text=" access ports to VLAN", font=(self.font, 10)) + vlan_change_text.grid(row=0, column=3, columnspan=1, sticky=tk.W) + self.vlan_new_entry = tk.Entry(master=vlan_options_frame, width=10, validate="all", validatecommand=(vlan_options_frame.register(lambda input:True if (str.isdigit(input) or input == "") else False), "%P")) + self.vlan_new_entry.grid(row=0, column=4, sticky=tk.W) + access_vlan_radio_select = tk.Radiobutton(master=vlan_options_frame, text="Change access VLANs", variable=self.toggle_voice_vlan_check, value=False) + access_vlan_radio_select.grid(row=1, column=0, columnspan=3, sticky=tk.W) + voice_vlan_radio_select = tk.Radiobutton(master=vlan_options_frame, text="Change voice VLANs", variable=self.toggle_voice_vlan_check, value=True) + voice_vlan_radio_select.grid(row=1, column=3, columnspan=3, sticky=tk.W) + ######################################################################################################### + turbo_options_frame = tk.Frame(master=options_frame, relief=tk.GROOVE, borderwidth=2) # Create frame from grouping turbo options. + turbo_options_frame.grid(row=3, rowspan=2, column=0, columnspan=5, sticky=tk.NSEW) + turbo_options_frame.rowconfigure(self.grid_size, weight=1) + turbo_options_frame.columnconfigure(self.grid_size, weight=1) + enable_turbo_deploy_checkbox = tk.Checkbutton(master=turbo_options_frame, variable=self.turbo_deploy_check, onvalue=True, offvalue=False) + enable_turbo_deploy_checkbox.grid(row=0, rowspan=1, column=0, columnspan=1, sticky=tk.E) + turbo_deploy_text = tk.Label(master=turbo_options_frame, text="Enable Turbo Deploy? (WARNING: Doesn't ask user for confirmation)", font=(self.font, 10)) + turbo_deploy_text.grid(row=0, column=1, columnspan=5, sticky=tk.W) + ######################################################################################################### + dhcp_arp_options_frame = tk.Frame(master=options_frame, relief=tk.GROOVE, borderwidth=2) # Create frame from grouping turbo options. + dhcp_arp_options_frame.grid(row=5, rowspan=2, column=0, columnspan=5, sticky=tk.NSEW) + dhcp_arp_options_frame.rowconfigure(self.grid_size, weight=1) + dhcp_arp_options_frame.columnconfigure(self.grid_size, weight=1) + enable_dhcp_snooping_checkbox = tk.Checkbutton(master=dhcp_arp_options_frame, variable=self.dhcp_snooping_check, onvalue=True, offvalue=False) + enable_dhcp_snooping_checkbox.grid(row=0, rowspan=1, column=0, columnspan=1, sticky=tk.E) + dhcp_vlan_text = tk.Label(master=dhcp_arp_options_frame, text="Enable DHCP Snooping on vlans", font=(self.font, 10)) + dhcp_vlan_text.grid(row=0, column=1, columnspan=2, sticky=tk.W) + self.dhcp_snoop_vlan_entry = tk.Entry(master=dhcp_arp_options_frame, width=10, validate="all", validatecommand=(dhcp_arp_options_frame.register(lambda input:True if (re.match("^[0-9,-]*$", input) or input == "") else False), "%P")) + self.dhcp_snoop_vlan_entry.grid(row=0, column=3, sticky=tk.W) self.dhcp_snoop_vlan_entry.insert(0, "1,2-4,5") - self.disable_dhcp_snooping_option82_checkbox = tk.Checkbutton(master=options_frame, variable=self.dhcp_snooping_option82_check, onvalue=True, offvalue=False) - self.disable_dhcp_snooping_option82_checkbox.grid(row=4, rowspan=1, column=0, columnspan=1, sticky=tk.E) - dhcp_option82_text = tk.Label(master=options_frame, text="Disable option82 injection. (Improves compatibility with DCHP servers)", font=(self.font, 10)) - dhcp_option82_text.grid(row=4, column=1, columnspan=4, sticky=tk.W) - self.enable_arp_inspection_checkbox = tk.Checkbutton(master=options_frame, variable=self.arp_inspection_check, onvalue=True, offvalue=False) - self.enable_arp_inspection_checkbox.grid(row=5, rowspan=1, column=0, columnspan=1, sticky=tk.E) - arp_vlan_text = tk.Label(master=options_frame, text="Enable ARP inspection on vlans", font=(self.font, 10)) - arp_vlan_text.grid(row=5, column=1, columnspan=2, sticky=tk.W) - self.arp_inspection_vlan_entry = tk.Entry(master=options_frame, width=10, validate="all", validatecommand=(options_frame.register(lambda input:True if (re.match("^[0-9,-]*$", input) or input == "") else False), "%P")) - self.arp_inspection_vlan_entry.grid(row=5, column=3, sticky=tk.W) + self.disable_dhcp_snooping_option82_checkbox = tk.Checkbutton(master=dhcp_arp_options_frame, variable=self.dhcp_snooping_option82_check, onvalue=True, offvalue=False) + self.disable_dhcp_snooping_option82_checkbox.grid(row=1, rowspan=1, column=0, columnspan=1, sticky=tk.E) + dhcp_option82_text = tk.Label(master=dhcp_arp_options_frame, text="Disable option82 injection. (Improves compatibility with DCHP servers)", font=(self.font, 10)) + dhcp_option82_text.grid(row=1, column=1, columnspan=4, sticky=tk.W) + self.enable_arp_inspection_checkbox = tk.Checkbutton(master=dhcp_arp_options_frame, variable=self.arp_inspection_check, onvalue=True, offvalue=False) + self.enable_arp_inspection_checkbox.grid(row=2, rowspan=1, column=0, columnspan=1, sticky=tk.E) + arp_vlan_text = tk.Label(master=dhcp_arp_options_frame, text="Enable ARP inspection on vlans", font=(self.font, 10)) + arp_vlan_text.grid(row=2, column=1, columnspan=2, sticky=tk.W) + self.arp_inspection_vlan_entry = tk.Entry(master=dhcp_arp_options_frame, width=10, validate="all", validatecommand=(dhcp_arp_options_frame.register(lambda input:True if (re.match("^[0-9,-]*$", input) or input == "") else False), "%P")) + self.arp_inspection_vlan_entry.grid(row=2, column=3, sticky=tk.W) self.arp_inspection_vlan_entry.insert(0, "1,2-4,5") + ######################################################################################################### # Populate console frame. self.list = tk.Listbox(master=console_frame, background="black", foreground="green", highlightcolor="green") @@ -725,7 +749,7 @@ def deploy_button_back_process(self, command_text, devices, usernames, passwords # Check if the user has enabled vlan changing. if self.change_vlan_check.get(): # Get vlan new and old numbers from the user. - vlan_command_text = change_access_port_vlans(self.vlan_old_entry.get(), self.vlan_new_entry.get(), ssh_device) + vlan_command_text = change_port_vlans(self.vlan_old_entry.get(), self.vlan_new_entry.get(), self.toggle_voice_vlan_check.get(), ssh_device) # Run the commands on the switch and show output, then ask the user if the output looks good. output += "\n\n" diff --git a/utils/deploy_options.py b/utils/deploy_options.py index a908719..9b55749 100644 --- a/utils/deploy_options.py +++ b/utils/deploy_options.py @@ -22,7 +22,7 @@ def find_ranges(iterable): else: yield group[0], group[-1] -def change_access_port_vlans(old_vlan, new_vlan, ssh_device): +def change_port_vlans(old_vlan, new_vlan, toggle_voice_vlans, ssh_device): # Create instance variables and objects. logger = logging.getLogger(__name__) command_text = "end\nconf t\n" @@ -67,7 +67,14 @@ def change_access_port_vlans(old_vlan, new_vlan, ssh_device): interface_ranges = interface_ranges[:-2] # Build commands list for vlan change. command_text += f"int range {interface_ranges}\n" - command_text += f"sw acc vlan {new_vlan}\n" + + # Check if we are changing access vlans or voice vlans. + if toggle_voice_vlans: + command_text += f"sw acc vlan {new_vlan}\n" + else: + command_text += f"sw voice vlan {new_vlan}\n" + + # Clear interface_range text and reset counter. interface_ranges = "" range_counter = 0 @@ -88,7 +95,11 @@ def change_access_port_vlans(old_vlan, new_vlan, ssh_device): interface_ranges = interface_ranges[:-2] # Build commands list for vlan change. command_text += f"int range {interface_ranges}\n" - command_text += f"sw acc vlan {new_vlan}\n" + # Check if we are changing access vlans or voice vlans. + if toggle_voice_vlans: + command_text += f"sw acc vlan {new_vlan}\n" + else: + command_text += f"sw voice vlan {new_vlan}\n" # Add end to exit global config mode. command_text += "end\n" diff --git a/utils/open_connection.py b/utils/open_connection.py index 19ff18c..51345b5 100644 --- a/utils/open_connection.py +++ b/utils/open_connection.py @@ -117,6 +117,8 @@ def ssh_autodetect_info(usernames, passwords, enable_secrets, enable_telnet, for break except ValueError: logger.error(f"Unable to find switch prompt for {ip_addr}") + except OSError: + logger.error(f"Device forcefully closed the socket. Are your creds locked out?") else: # Set default value. remote_device["ip_address"] = ip_addr From 0b46559c0854113411558f00a9b27e90a1e125d6 Mon Sep 17 00:00:00 2001 From: ClayJay3 Date: Wed, 14 Jun 2023 16:21:38 -0500 Subject: [PATCH 2/9] Tested access vlan and voice vlan toggle. --- interface/main_window.py | 16 +++++++++++----- utils/deploy_options.py | 8 ++++---- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/interface/main_window.py b/interface/main_window.py index e28dcf1..73e09fd 100644 --- a/interface/main_window.py +++ b/interface/main_window.py @@ -84,6 +84,8 @@ def initialize_window(self) -> None: self.force_telnet_check = None self.vlan_old_entry = None self.vlan_new_entry = None + self.access_vlan_radio_select = None + self.voice_vlan_radio_select = None self.vlan_entries_state_enabled = True self.dhcp_snoop_vlan_entry = None self.disable_dhcp_snooping_option82_checkbox = None @@ -246,14 +248,14 @@ def initialize_window(self) -> None: vlan_change_text.grid(row=0, column=1, columnspan=1, sticky=tk.W) self.vlan_old_entry = tk.Entry(master=vlan_options_frame, width=10, validate="all", validatecommand=(vlan_options_frame.register(lambda input:True if str.isdigit(input) or input == "" else False), "%P")) self.vlan_old_entry.grid(row=0, column=2, sticky=tk.W) - vlan_change_text = tk.Label(master=vlan_options_frame, text=" access ports to VLAN", font=(self.font, 10)) + vlan_change_text = tk.Label(master=vlan_options_frame, text=" ports to VLAN", font=(self.font, 10)) vlan_change_text.grid(row=0, column=3, columnspan=1, sticky=tk.W) self.vlan_new_entry = tk.Entry(master=vlan_options_frame, width=10, validate="all", validatecommand=(vlan_options_frame.register(lambda input:True if (str.isdigit(input) or input == "") else False), "%P")) self.vlan_new_entry.grid(row=0, column=4, sticky=tk.W) - access_vlan_radio_select = tk.Radiobutton(master=vlan_options_frame, text="Change access VLANs", variable=self.toggle_voice_vlan_check, value=False) - access_vlan_radio_select.grid(row=1, column=0, columnspan=3, sticky=tk.W) - voice_vlan_radio_select = tk.Radiobutton(master=vlan_options_frame, text="Change voice VLANs", variable=self.toggle_voice_vlan_check, value=True) - voice_vlan_radio_select.grid(row=1, column=3, columnspan=3, sticky=tk.W) + self.access_vlan_radio_select = tk.Radiobutton(master=vlan_options_frame, text="Change access VLANs", variable=self.toggle_voice_vlan_check, value=False) + self.access_vlan_radio_select.grid(row=1, column=0, columnspan=10, sticky=tk.W) + self.voice_vlan_radio_select = tk.Radiobutton(master=vlan_options_frame, text="Change voice VLANs", variable=self.toggle_voice_vlan_check, value=True) + self.voice_vlan_radio_select.grid(row=1, column=3, columnspan=6, sticky=tk.W) ######################################################################################################### turbo_options_frame = tk.Frame(master=options_frame, relief=tk.GROOVE, borderwidth=2) # Create frame from grouping turbo options. turbo_options_frame.grid(row=3, rowspan=2, column=0, columnspan=5, sticky=tk.NSEW) @@ -933,12 +935,16 @@ def update_window(self) -> None: # Disable elements. self.vlan_old_entry.configure(state="disable") self.vlan_new_entry.configure(state="disable") + self.access_vlan_radio_select.configure(state="disable") + self.voice_vlan_radio_select.configure(state="disable") # Update toggle var. self.vlan_entries_state_enabled = False elif self.change_vlan_check.get() and not self.vlan_entries_state_enabled: # Disable elements. self.vlan_old_entry.configure(state="normal") self.vlan_new_entry.configure(state="normal") + self.access_vlan_radio_select.configure(state="normal") + self.voice_vlan_radio_select.configure(state="normal") # Update toggle var. self.vlan_entries_state_enabled = True # DHCP entries and checkboxes. diff --git a/utils/deploy_options.py b/utils/deploy_options.py index 9b55749..752284d 100644 --- a/utils/deploy_options.py +++ b/utils/deploy_options.py @@ -70,9 +70,9 @@ def change_port_vlans(old_vlan, new_vlan, toggle_voice_vlans, ssh_device): # Check if we are changing access vlans or voice vlans. if toggle_voice_vlans: - command_text += f"sw acc vlan {new_vlan}\n" - else: command_text += f"sw voice vlan {new_vlan}\n" + else: + command_text += f"sw acc vlan {new_vlan}\n" # Clear interface_range text and reset counter. @@ -97,9 +97,9 @@ def change_port_vlans(old_vlan, new_vlan, toggle_voice_vlans, ssh_device): command_text += f"int range {interface_ranges}\n" # Check if we are changing access vlans or voice vlans. if toggle_voice_vlans: - command_text += f"sw acc vlan {new_vlan}\n" - else: command_text += f"sw voice vlan {new_vlan}\n" + else: + command_text += f"sw acc vlan {new_vlan}\n" # Add end to exit global config mode. command_text += "end\n" From 5c4cf606e5d395523bbb9354e9b285bf64064ec6 Mon Sep 17 00:00:00 2001 From: ClayJay3 Date: Thu, 22 Jun 2023 09:02:24 -0500 Subject: [PATCH 3/9] Error handling for commands that take extra long to run. --- interface/main_window.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/interface/main_window.py b/interface/main_window.py index 73e09fd..1110cb7 100644 --- a/interface/main_window.py +++ b/interface/main_window.py @@ -747,6 +747,8 @@ def deploy_button_back_process(self, command_text, devices, usernames, passwords except ReadTimeout: self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.") messagebox.showwarning(message=f"Couldn't get command output for {device['ip_addr']}. However, it is likely the commands still ran and the console just took too long to print output.") + except Exception as e: + self.logger.error(f"Netmiko ERROR: {e}") # Check if the user has enabled vlan changing. if self.change_vlan_check.get(): From a080881f25485c321fd406718f68c099118aa1ac Mon Sep 17 00:00:00 2001 From: ClayJay3 Date: Thu, 22 Jun 2023 16:34:46 -0500 Subject: [PATCH 4/9] Began threading deploy process and fixed device deploy counter. --- interface/main_window.py | 534 ++++++++++++++++++++++----------------- 1 file changed, 302 insertions(+), 232 deletions(-) diff --git a/interface/main_window.py b/interface/main_window.py index 1110cb7..09d3731 100644 --- a/interface/main_window.py +++ b/interface/main_window.py @@ -46,6 +46,19 @@ def __init__(self) -> None: self.already_auto_discovering = False self.export_permission_error = False + # Deploy vars. + self.already_deploying = False + self.directory_name = "" + self.selected_devices = [] + self.usernames = [] + self.passwords = [] + self.enable_secrets = [] + self.command_text = "" + self.user_result = False + self.bad_deploys = [] + self.deploy_devices_total_count = 0 + self.deploy_frames_state_enabled = True + # Open log file for displaying in console window. self.log_file = open("logs/latest.log", "r", encoding="utf-8") # Create cache file. @@ -147,10 +160,10 @@ def initialize_window(self) -> None: console_frame.rowconfigure(self.grid_size, weight=1) console_frame.columnconfigure(self.grid_size, weight=1) # Create frame for entering quick command. - options_frame = tk.Frame(master=self.window, relief=tk.GROOVE, borderwidth=3) - options_frame.grid(row=9, rowspan=1, column=5, columnspan=5, sticky=tk.NSEW) - options_frame.rowconfigure(self.grid_size, weight=1) - options_frame.columnconfigure(self.grid_size, weight=1) + self.options_frame = tk.Frame(master=self.window, relief=tk.GROOVE, borderwidth=3) + self.options_frame.grid(row=9, rowspan=1, column=5, columnspan=5, sticky=tk.NSEW) + self.options_frame.rowconfigure(self.grid_size, weight=1) + self.options_frame.columnconfigure(self.grid_size, weight=1) # Populate title frame. greeting = tk.Label(master=title_frame, text="Welcome to NetCommander!", font=(self.font, 18)) @@ -235,10 +248,10 @@ def initialize_window(self) -> None: ######################################################################################################### # Populate option frame. ######################################################################################################### - options_label = tk.Label(master=options_frame, text="Options:", font=(self.font, 14)) + options_label = tk.Label(master=self.options_frame, text="Options:", font=(self.font, 14)) options_label.grid(row=0, column=0, columnspan=10, sticky=tk.NSEW) ######################################################################################################### - vlan_options_frame = tk.Frame(master=options_frame, relief=tk.GROOVE, borderwidth=2) # Create frame from grouping vlan options. + vlan_options_frame = tk.Frame(master=self.options_frame, relief=tk.GROOVE, borderwidth=2) # Create frame from grouping vlan options. vlan_options_frame.grid(row=1, rowspan=2, column=0, columnspan=5, sticky=tk.NSEW) vlan_options_frame.rowconfigure(self.grid_size, weight=1) vlan_options_frame.columnconfigure(self.grid_size, weight=1) @@ -257,7 +270,7 @@ def initialize_window(self) -> None: self.voice_vlan_radio_select = tk.Radiobutton(master=vlan_options_frame, text="Change voice VLANs", variable=self.toggle_voice_vlan_check, value=True) self.voice_vlan_radio_select.grid(row=1, column=3, columnspan=6, sticky=tk.W) ######################################################################################################### - turbo_options_frame = tk.Frame(master=options_frame, relief=tk.GROOVE, borderwidth=2) # Create frame from grouping turbo options. + turbo_options_frame = tk.Frame(master=self.options_frame, relief=tk.GROOVE, borderwidth=2) # Create frame from grouping turbo options. turbo_options_frame.grid(row=3, rowspan=2, column=0, columnspan=5, sticky=tk.NSEW) turbo_options_frame.rowconfigure(self.grid_size, weight=1) turbo_options_frame.columnconfigure(self.grid_size, weight=1) @@ -266,7 +279,7 @@ def initialize_window(self) -> None: turbo_deploy_text = tk.Label(master=turbo_options_frame, text="Enable Turbo Deploy? (WARNING: Doesn't ask user for confirmation)", font=(self.font, 10)) turbo_deploy_text.grid(row=0, column=1, columnspan=5, sticky=tk.W) ######################################################################################################### - dhcp_arp_options_frame = tk.Frame(master=options_frame, relief=tk.GROOVE, borderwidth=2) # Create frame from grouping turbo options. + dhcp_arp_options_frame = tk.Frame(master=self.options_frame, relief=tk.GROOVE, borderwidth=2) # Create frame from grouping turbo options. dhcp_arp_options_frame.grid(row=5, rowspan=2, column=0, columnspan=5, sticky=tk.NSEW) dhcp_arp_options_frame.rowconfigure(self.grid_size, weight=1) dhcp_arp_options_frame.columnconfigure(self.grid_size, weight=1) @@ -619,8 +632,9 @@ def auto_discover_back_process(self, text, usernames, passwords, enable_secrets, def deploy_button_callback(self) -> None: """ - This function is triggered everytime the Deploy button is pressed. The process for this button click triggers - a new thread that send the given commands to all selected devices. + This function is triggered everytime the Deploy button is pressed and continues executing until the + deploy is finished. The process for this button click triggers a new thread that send the given + commands to all selected devices. Parameters: ----------- @@ -629,116 +643,229 @@ def deploy_button_callback(self) -> None: Returns: -------- Nothing - """ - # Create deploy output directory. - directory_name = f"deploy_outputs/{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" - os.makedirs(directory_name, exist_ok=True) - - # Loop through each boolean value and match it to the device ip. - selected_devices = [] - for selected, device in zip(self.switch_tree_check_values, self.selection_devices_list): - # Get the selected devices status and append them to a list if true. - if selected.get(): - # Insert object at beginning of list. This effectively reverses the list, so we can just go through is from the beginning when deploying commands. - selected_devices.insert(0, device) - - # Get username and password lists. - usernames = [username.get() for username in self.username_entrys] - passwords = [password.get() for password in self.password_entrys] - enable_secrets = [secret.get() for secret in self.secret_entrys] - # Remove empty passwords. - for i, password in enumerate(passwords): - # Check length. - if len(password) <= 0: - # Remove list item. - usernames.pop(i) - passwords.pop(i) - enable_secrets.pop(i) - - # Get commands from textbox. - text = self.command_textbox.get('1.0', tk.END) - - # Check if selected devices list is empty. - if len(selected_devices) > 0: - # Check to make sure the user entered something for the vlan change if selected. - if self.change_vlan_check.get() and len(self.vlan_old_entry.get()) <= 0 and len(self.vlan_new_entry.get()) <= 0: - # Print log and show messagebox. - self.logger.warning("Can't start deploy because no vlans have been entered to change! If you don't want to enter vlans, then uncheck the box in the options pane.") - messagebox.showwarning(message="Can't start deploy because the user didn't specify old and new vlans even though the checkbox was ticked.") - elif self.dhcp_snooping_check.get() and len(self.dhcp_snoop_vlan_entry.get()) <= 0: - # Print log and show messagebox. - self.logger.warning("Can't start deploy because no vlans have been entered to snoop on!") - messagebox.showwarning(message="Can't start deploy because no vlans have been entered to snoop on! You must enter vlans like: 1,2-3,4") - if self.arp_inspection_check.get() and len(self.arp_inspection_vlan_entry.get()) <= 0: + """ + # Check if a deploy has already been started. + if not self.already_deploying: + # Clear class deploy vars. + self.already_deploying = False + self.directory_name = "" + self.selected_devices = [] + self.usernames = [] + self.passwords = [] + self.enable_secrets = [] + self.command_text = "" + self.user_result = False + self.bad_deploys = [] + self.deploy_devices_total_count = 0 + + # Create deploy output directory. + self.directory_name = f"deploy_outputs/{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" + os.makedirs(self.directory_name, exist_ok=True) + + # Loop through each boolean value and match it to the device ip. + self.selected_devices = [] + for selected, device in zip(self.switch_tree_check_values, self.selection_devices_list): + # Get the selected devices status and append them to a list if true. + if selected.get(): + # Insert object at beginning of list. This effectively reverses the list, so we can just go through is from the beginning when deploying commands. + self.selected_devices.insert(0, device) + # Count number of deploy devices. + self.deploy_devices_total_count += 1 + + # Get username and password lists. + self.usernames = [username.get() for username in self.username_entrys] + self.passwords = [password.get() for password in self.password_entrys] + self.enable_secrets = [secret.get() for secret in self.secret_entrys] + # Remove empty passwords. + for i, password in enumerate(self.passwords): + # Check length. + if len(password) <= 0: + # Remove list item. + self.usernames.pop(i) + self.passwords.pop(i) + self.enable_secrets.pop(i) + + # Get commands from textbox. + self.command_text = self.command_textbox.get('1.0', tk.END) + + # Check if selected devices list is empty. + if len(self.selected_devices) > 0: + # Check to make sure the user entered something for the vlan change if selected. + if self.change_vlan_check.get() and len(self.vlan_old_entry.get()) <= 0 and len(self.vlan_new_entry.get()) <= 0: # Print log and show messagebox. - self.logger.warning("Can't start deploy because no vlans have been entered to do arp inspection on!") - messagebox.showwarning(message="Can't start deploy because no vlans have been entered to do arp inspection on! You must enter vlans like: 1,2-3,4") + self.logger.warning("Can't start deploy because no vlans have been entered to change! If you don't want to enter vlans, then uncheck the box in the options pane.") + messagebox.showwarning(message="Can't start deploy because the user didn't specify old and new vlans even though the checkbox was ticked.") + elif self.dhcp_snooping_check.get() and len(self.dhcp_snoop_vlan_entry.get()) <= 0: + # Print log and show messagebox. + self.logger.warning("Can't start deploy because no vlans have been entered to snoop on!") + messagebox.showwarning(message="Can't start deploy because no vlans have been entered to snoop on! You must enter vlans like: 1,2-3,4") + if self.arp_inspection_check.get() and len(self.arp_inspection_vlan_entry.get()) <= 0: + # Print log and show messagebox. + self.logger.warning("Can't start deploy because no vlans have been entered to do arp inspection on!") + messagebox.showwarning(message="Can't start deploy because no vlans have been entered to do arp inspection on! You must enter vlans like: 1,2-3,4") + else: + # Before starting threads, ask user if they are sure they want to continue. + self.logger.info("Asking user if they are sure they want to start the deploy process...") + self.user_result = messagebox.askyesno(title="ATTENTION!", message="Are you sure you want to start the deploy process? This will make changes to the devices!") + # Display warning messages about DHCP Snooping. + if self.dhcp_snooping_check.get() and self.user_result: + self.logger.warning("Asking user if they are ABSOLUTELY SURE they want to enable DHCP snooping on switches...") + self.user_result = messagebox.askyesno(message="WARNING!!! By enabling DHCP snooping all dhcp offers from non-trunk ports will be blocked. After enabling this feature, you must manually log into the switch connected to your DHCP server and run 'ip dhcp snooping trust' on the switchport interface. If you don't, then no hosts will be able to receive a new DHCP address! ARE YOU ABSOLUTELY SURE YOU WANT TO CONTINUE?") + if self.arp_inspection_check.get() and self.user_result: + self.logger.warning("Asking user if they are ABSOLUTELY SURE they want to enable ARP inspection on switches...") + self.user_result = messagebox.askyesno(message="WARNING!!! By enabling ARP snooping all non DHCP devices will be blocked from connecting to the network! After enabling this feature, you must manually log into EVERY SWITCH and run 'ip arp inspection trust' on the ports you trust or add a manual binding in the 'ip source' table. If you don't, then any hosts with a static IP will not be able to join the network! ARE YOU ABSOLUTELY SURE YOU WANT TO CONTINUE?") else: - # Before starting threads, ask user if they are sure they want to continue. - self.logger.info("Asking user if they are sure they want to start the deploy process...") - user_result = messagebox.askyesno(title="ATTENTION!", message="Are you sure you want to start the deploy process? This will make changes to the devices!") - # Display warning messages about DHCP Snooping. - if self.dhcp_snooping_check.get() and user_result: - self.logger.warning("Asking user if they are ABSOLUTELY SURE they want to enable DHCP snooping on switches...") - user_result = messagebox.askyesno(message="WARNING!!! By enabling DHCP snooping all dhcp offers from non-trunk ports will be blocked. After enabling this feature, you must manually log into the switch connected to your DHCP server and run 'ip dhcp snooping trust' on the switchport interface. If you don't, then no hosts will be able to receive a new DHCP address! ARE YOU ABSOLUTELY SURE YOU WANT TO CONTINUE?") - if self.arp_inspection_check.get() and user_result: - self.logger.warning("Asking user if they are ABSOLUTELY SURE they want to enable ARP inspection on switches...") - user_result = messagebox.askyesno(message="WARNING!!! By enabling ARP snooping all non DHCP devices will be blocked from connecting to the network! After enabling this feature, you must manually log into EVERY SWITCH and run 'ip arp inspections trust' on the ports you trust or add a manual binding in the 'ip source' table. If you don't, then any hosts with a static IP will not be able to join the network! ARE YOU ABSOLUTELY SURE YOU WANT TO CONTINUE?") - - # Check user choice. - if user_result: - # Print log. + # Print log and show messagebox. + self.logger.warning("Can't start deploy because no devices have been selected.") + messagebox.showwarning(message="Can't start deploy because no devices have been selected.") + + # Check user is sure they want to start deploy. + if self.user_result: + # Check if commands are empty. + if len(self.command_text) > 1: + if not self.already_deploying: + # Toggle that deploy has been started and print log. + self.already_deploying = True self.logger.info("Command deploy has been started!") - # Start a new thread that connects to each ip and runs the given commands. - # Thread(target=self.deploy_button_back_process, args=(text, usernames, passwords, enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get())).start() - bad_deploys = self.deploy_button_back_process(text, selected_devices, usernames, passwords, enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get(), directory_name) + # Set window size. Take it out of fullscreen. + self.window.wm_state('iconic') + # Print log. + self.logger.info(f"NetCommander will be running these commands on the selected switches: \n{self.command_text}") + + # Get the next up switch. + device = self.selected_devices.pop(0) + exit_messages = {"info": "", "warning": "", "error": "", "critical": ""} + output = "test" + # Start a new thread that connects to each ip and runs the given commands. + # exit_messages, output = self.deploy_button_back_process(text, device, self.usernames, self.passwords, self.enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get()) + + # Show the output to the user and ask if it is correct. + text_popup(f"Command Output for {device['hostname']}, {device['ip_addr']}", output, x_grid_size=10, y_grid_size=10) + # Write output to a file. + with open(f"{self.directory_name}/{device['hostname']}({device['ip_addr']}).txt", 'w+') as file: + for line in output: + file.write(line) + + # Check if turbo deploy is enabled. + if self.turbo_deploy_check.get(): + correct_output = True + continue_deploy = True + else: + # Ask the user if the output is correct. + correct_output = messagebox.askyesno(title=f"Confirm correct output for {device['hostname']}, {device['ip_addr']}", message="Is this output correct? Its output will be saved to the deploy_outputs folder.") + # Ask the user if they want to continue. + continue_deploy = messagebox.askyesno(title="Continue deploy?", message="Would you like to continue the command deploy?") + + # If the output was incorrect add the switch to a list. + if not correct_output: + self.bad_deploys.append(device) + # If the user doesn't want to continue the deploy then stop looping. + if not continue_deploy: + # Print log. + self.logger.info("The deploy has been canceled by the user.") + # Check if the command deployment is done. + if len(self.selected_devices) <= 0 and self.already_deploying or not continue_deploy: + # Print bad deploy devices to logs. + self.logger.info(f"Unable to fully deploy commands to these devices: {self.bad_deploys}") # Print log and show messagebox stating the deploy has finished. - self.logger.info(f"The command deploy has finished! {len(bad_deploys)} out of {len(selected_devices)} did not successfully execute the given commands. Opening window with the IPs now...") - messagebox.showinfo(message=f"The command deploy has finished! {len(bad_deploys)} out of {len(selected_devices)} did not successfully execute the given commands.") + self.logger.info(f"The command deploy has finished! {len(self.bad_deploys)} out of {self.deploy_devices_total_count} did not successfully execute the given commands. Opening window with the IPs now...") + messagebox.showinfo(message=f"The command deploy has finished! {len(self.bad_deploys)} out of {self.deploy_devices_total_count} did not successfully execute the given commands.") # Check if we need to open the window. - if len(bad_deploys) > 0: - text_popup(title="Bad Deploy Devices", text=[f"{device['ip_addr']} - {device['hostname']}\n" for device in bad_deploys]) - else: - # Print log. - self.logger.info("Command deploy has been canceled.") + if len(self.bad_deploys) > 0: + text_popup(title="Bad Deploy Devices", text=[f"{device['ip_addr']} - {device['hostname']}\n" for device in self.bad_deploys]) + + # Reset deploy toggle. + self.already_deploying = False + else: + # Print log and show messagebox to user. + self.logger.info("Command textbox is empty!") + messagebox.showwarning(title="Warning!", message="Command textbox is empty!") else: - # Print log and show messagebox. - self.logger.warning("Can't start deploy because no devices have been selected.") - messagebox.showwarning(message="Can't start deploy because no devices have been selected.") + # Print log. + self.logger.info("Command deploy has been canceled.") - def deploy_button_back_process(self, command_text, devices, usernames, passwords, enable_secrets, enable_telnet, force_telnet, directory_name="deploy_outputs/") -> None: + def deploy_button_back_process(self, command_text, device, usernames, passwords, enable_secrets, enable_telnet, force_telnet) -> None: """ - Helper function for Deploy Button, runs in a new thread. + Helper function for Deploy Button, deploys commands to a single device in a new thread. """ # Create instance variables. - bad_deploys = [] + exit_messages = {"info": "", "warning": "", "error": "", "critical": ""} - # Set window size. Take it out of fullscreen. - self.window.wm_state('iconic') + # Attempt to login to the given device. + ssh_device = ssh_autodetect_info(usernames, passwords, enable_secrets, enable_telnet, force_telnet, device["ip_addr"]) + connection = ssh_telnet(ssh_device, enable_telnet, force_telnet, store_config_info=True) + + try: + # Check if device connection was successful. + if connection is not None and connection.is_alive(): + # Check the privilege level of our connection. Must be 15 to execute all commands. + if "15" in connection.send_command("show privilege"): + # Print log. + self.logger.info(f"Connected to device {device['ip_addr']}. Running commands...") + + output = "" + # Run the commands on the switch and show output, then ask the user if the output looks good. + for line in command_text.splitlines(): + # Catch timeouts. + try: + # Send the current command to the switch. + output += f"\n{connection.find_prompt()}{line}\n" + output += connection.send_command(line, expect_string="#") + except ReadTimeout: + self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.") + exit_messages["warning"] += f"Couldn't get command output for {device['ip_addr']} running command {line}. However, it is likely the command still ran and the console just took too long to print output.\n" + except Exception as e: + self.logger.error(f"Netmiko ERROR: {e}") + + # Check if the user has enabled vlan changing. + if self.change_vlan_check.get(): + # Get vlan new and old numbers from the user. + vlan_command_text = change_port_vlans(self.vlan_old_entry.get(), self.vlan_new_entry.get(), self.toggle_voice_vlan_check.get(), ssh_device) + + # Run the commands on the switch and show output, then ask the user if the output looks good. + output += "\n\n" + for line in vlan_command_text.splitlines(): + # Catch timeouts. + try: + # Send the current command to the switch. + output += f"\n{connection.find_prompt()}{line}\n" + output += connection.send_command(line, expect_string="#") + except ReadTimeout: + self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.") + exit_messages["warning"] += f"Couldn't get command output for {device['ip_addr']} running command {line}. However, it is likely the command still ran and the console just took too long to print output.\n" + + # Append some newlines to the output to keep it pretty. + output += "\n\n" + + # Check if the user has enabled DHCP snooping option. + if self.dhcp_snooping_check.get(): + # Get vlan new and old numbers from the user. + vlan_command_text = setup_dhcp_snooping_on_trunks(ssh_device, self.dhcp_snoop_vlan_entry.get(), self.dhcp_snooping_option82_check.get()) + + # Run the commands on the switch and show output, then ask the user if the output looks good. + output += "\n\n" + for line in vlan_command_text.splitlines(): + # Catch timeouts. + try: + # Send the current command to the switch. + output += f"\n{connection.find_prompt()}{line}\n" + output += connection.send_command(line, expect_string="#") + except ReadTimeout: + self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.") + exit_messages["warning"] += f"Couldn't get command output for {device['ip_addr']} running command {line}. However, it is likely the command still ran and the console just took too long to print output.\n" + + # Append some newlines to the output to keep it pretty. + output += "\n\n" + + # Check if the user has enabled ARP inspection option. + if self.arp_inspection_check.get(): + # Get vlan new and old numbers from the user. + vlan_command_text = setup_dynamic_arp_inspection_on_trunks(ssh_device, self.arp_inspection_vlan_entry.get()) - # Print log. - self.logger.info(f"NetCommander will be running these commands on the selected switches: \n{command_text}") - - # Loop through all selected devices. - for device in devices: - # Attempt to login to the first device. - ssh_device = ssh_autodetect_info(usernames, passwords, enable_secrets, enable_telnet, force_telnet, device["ip_addr"]) - connection = ssh_telnet(ssh_device, enable_telnet, force_telnet, store_config_info=True) - - try: - # Check if device connection was successful. - if connection is not None and connection.is_alive(): - # Check the privilege level of our connection. Must be 15 to execute all commands. - if "15" in connection.send_command("show privilege"): - # Check if commands are empty. - if len(command_text) > 1: - # Print log. - self.logger.info(f"Connected to device {device['ip_addr']}. Running commands...") - - output = "" # Run the commands on the switch and show output, then ask the user if the output looks good. - for line in command_text.splitlines(): + output += "\n\n" + for line in vlan_command_text.splitlines(): # Catch timeouts. try: # Send the current command to the switch. @@ -746,136 +873,24 @@ def deploy_button_back_process(self, command_text, devices, usernames, passwords output += connection.send_command(line, expect_string="#") except ReadTimeout: self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.") - messagebox.showwarning(message=f"Couldn't get command output for {device['ip_addr']}. However, it is likely the commands still ran and the console just took too long to print output.") - except Exception as e: - self.logger.error(f"Netmiko ERROR: {e}") - - # Check if the user has enabled vlan changing. - if self.change_vlan_check.get(): - # Get vlan new and old numbers from the user. - vlan_command_text = change_port_vlans(self.vlan_old_entry.get(), self.vlan_new_entry.get(), self.toggle_voice_vlan_check.get(), ssh_device) - - # Run the commands on the switch and show output, then ask the user if the output looks good. - output += "\n\n" - for line in vlan_command_text.splitlines(): - # Catch timeouts. - try: - # Send the current command to the switch. - output += f"\n{connection.find_prompt()}{line}\n" - output += connection.send_command(line, expect_string="#") - except ReadTimeout: - self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.") - messagebox.showwarning(message=f"Couldn't get command output for {device['ip_addr']}. However, it is likely the commands still ran and the console just took too long to print output.") - - # Append some newlines to the output to keep it pretty. - output += "\n\n" - - # Check if the user has enabled DHCP snooping option. - if self.dhcp_snooping_check.get(): - # Get vlan new and old numbers from the user. - vlan_command_text = setup_dhcp_snooping_on_trunks(ssh_device, self.dhcp_snoop_vlan_entry.get(), self.dhcp_snooping_option82_check.get()) - - # Run the commands on the switch and show output, then ask the user if the output looks good. - output += "\n\n" - for line in vlan_command_text.splitlines(): - # Catch timeouts. - try: - # Send the current command to the switch. - output += f"\n{connection.find_prompt()}{line}\n" - output += connection.send_command(line, expect_string="#") - except ReadTimeout: - self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.") - messagebox.showwarning(message=f"Couldn't get command output for {device['ip_addr']}. However, it is likely the commands still ran and the console just took too long to print output.") - - # Append some newlines to the output to keep it pretty. - output += "\n\n" - - # Check if the user has enabled ARP inspection option. - if self.arp_inspection_check.get(): - # Get vlan new and old numbers from the user. - vlan_command_text = setup_dynamic_arp_inspection_on_trunks(ssh_device, self.arp_inspection_vlan_entry.get()) - - # Run the commands on the switch and show output, then ask the user if the output looks good. - output += "\n\n" - for line in vlan_command_text.splitlines(): - # Catch timeouts. - try: - # Send the current command to the switch. - output += f"\n{connection.find_prompt()}{line}\n" - output += connection.send_command(line, expect_string="#") - except ReadTimeout: - self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.") - messagebox.showwarning(message=f"Couldn't get command output for {device['ip_addr']}. However, it is likely the commands still ran and the console just took too long to print output.") - - # Append some newlines to the output to keep it pretty. - output += "\n\n" - - # Show the output to the user and ask if it is correct. - text_popup(f"Command Output for {device['hostname']}, {device['ip_addr']}", output, x_grid_size=10, y_grid_size=10) - # Write output to a file. - with open(f"{directory_name}/{device['hostname']}({device['ip_addr']}).txt", 'w+') as file: - for line in output: - file.write(line) - - # Check if turbo deploy is enabled. - if self.turbo_deploy_check.get(): - correct_output = True - continue_deploy = True - else: - # Ask the user if the output is correct. - correct_output = messagebox.askyesno(title=f"Confirm correct output for {device['hostname']}, {device['ip_addr']}", message="Is this output correct? Its output will be saved to the deploy_outputs folder.") - # Ask the user if they want to continue. - continue_deploy = messagebox.askyesno(title="Continue deploy?", message="Would you like to continue the command deploy?") - - # If the output was incorrect add the switch to a list. - if not correct_output: - bad_deploys.append(device) - # If the user doesn't want to continue the deploy then stop looping. - if not continue_deploy: - # Print log. - self.logger.info("The deploy has been canceled by the user.") - # Disconnection from the current device. - connection.disconnect() - # Exit for loop. - break - else: - # Print log and show messagebox to user. - self.logger.info("Command textbox is empty!") - messagebox.showwarning(message="Command textbox is empty!") - # Store all device in the bad deploy list. - bad_deploys = devices - # Disconnection from the current device. - connection.disconnect() - # Exit for loop. - break - else: - # Print log and show messagebox. - self.logger.error(f"Insignificant privilege level to safely run all commands on {device['ip_addr']}. Skipping and adding to bad deploy list...") - messagebox.showerror(message=f"Insignificant privilege level to safely run all commands on {device['ip_addr']}. The device will be skipped and marked as a bad deploy.") - # Append current device to bad deploy list. - bad_deploys.append(device) + exit_messages["warning"] += f"Couldn't get command output for {device['ip_addr']} running command {line}. However, it is likely the command still ran and the console just took too long to print output.\n" - # Disconnection from device. - connection.disconnect() + # Append some newlines to the output to keep it pretty. + output += "\n\n" else: # Print log and show messagebox. - self.logger.error(f"Failed to connection to {device['ip_addr']}") - messagebox.showerror(message=f"Couldn't connect to {device['ip_addr']}. Moving on to next device.") - # Append current device to bad deploy list. - bad_deploys.append(device) - - # Update main window. - self.window.update() - except OSError: - self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. Paramiko reported the socket as being closed. It is recommended that you rerun your commands on this switch!") - messagebox.showwarning(message=f"Couldn't get command output for {device['ip_addr']}. Paramiko reported the socket as being closed. It is recommended that you rerun your commands on this switch!") - # Append current device to bad deploy list. - bad_deploys.append(device) - - # Print bad deploy devices to logs. - self.logger.info(f"Unable to fully deploy commands to these devices: {bad_deploys}") + self.logger.error(f"Insignificant privilege level to safely run all commands on {device['ip_addr']}. Skipping and adding to bad deploy list...") + exit_messages[error] += f"Insignificant privilege level to safely run all commands on {device['ip_addr']}. The device will be skipped and marked as a bad deploy.\n" - return bad_deploys + # Disconnection from device. + connection.disconnect() + else: + # Print log and show messagebox. + self.logger.error(f"Failed to connection to {device['ip_addr']}") + exit_messages[error] += "Couldn't connect to {device['ip_addr']}. Moving on to next device.\n" + except OSError: + self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. Paramiko reported the socket as being closed. It is recommended that you rerun your commands on this switch!") + exit_messages += "Couldn't get command output for {device['ip_addr']}. Paramiko reported the socket as being closed. It is recommended that you rerun your commands on this switch!\n" def mass_ping_button_callback(self) -> None: """ @@ -1064,6 +1079,25 @@ def update_window(self) -> None: self.tree_canvas.itemconfig('frame', height=self.tree_canvas.winfo_height()) self.tree_canvas.update_idletasks() + # Check if the deploy toggle is currently set. + if self.already_deploying: + # Check toggle. + if self.deploy_frames_state_enabled: + # Disable frames so user can't jack with the important controls. + self.disable_children(self.options_frame) + self.disable_children(self.creds_frame) + # Set toggle. + self.deploy_frames_state_enabled = False + + # If so, call the deploy callback function to handle threads and deploy logic. + self.deploy_button_callback() + elif not self.deploy_frames_state_enabled: + # Disable frames so user can't jack with the important controls. + self.enable_children(self.options_frame) + self.enable_children(self.creds_frame) + # Set toggle. + self.deploy_frames_state_enabled = True + # Call main window event loop. self.window.update() @@ -1102,6 +1136,42 @@ def close_window(self) -> None: # Close window. self.window.destroy() + def enable_children(self, parent): + """ + Enable all tkinter widgets contained within the given parent frame or window. + + Parameters: + ----------- + parent - The parent widget. + Returns: + -------- + Nothing + """ + for child in parent.winfo_children(): + wtype = child.winfo_class() + if wtype not in ('Frame','Labelframe','TFrame','TLabelframe'): + child.configure(state='normal') + else: + self.enable_children(child) + + def disable_children(self, parent): + """ + Disable all tkinter widgets contained within the given parent frame or window. + + Parameters: + ----------- + parent - The parent widget. + Returns: + -------- + Nothing + """ + for child in parent.winfo_children(): + wtype = child.winfo_class() + if wtype not in ('Frame','Labelframe','TFrame','TLabelframe'): + child.configure(state='disable') + else: + self.disable_children(child) + def get_is_window_open(self) -> bool: """ Returns if the window is still open and running. From 18b3c6bae428b90b6d3c1641719cd7a5853de722 Mon Sep 17 00:00:00 2001 From: ClayJay3 Date: Fri, 23 Jun 2023 09:40:07 -0500 Subject: [PATCH 5/9] Fixed widget enabling bug. --- interface/main_window.py | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/interface/main_window.py b/interface/main_window.py index 09d3731..6c6af96 100644 --- a/interface/main_window.py +++ b/interface/main_window.py @@ -734,11 +734,11 @@ def deploy_button_callback(self) -> None: # Get the next up switch. device = self.selected_devices.pop(0) - exit_messages = {"info": "", "warning": "", "error": "", "critical": ""} - output = "test" # Start a new thread that connects to each ip and runs the given commands. - # exit_messages, output = self.deploy_button_back_process(text, device, self.usernames, self.passwords, self.enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get()) + exit_messages, output = self.deploy_button_back_process(self.command_text, device, self.usernames, self.passwords, self.enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get()) + print(exit_messages) + # Show the output to the user and ask if it is correct. text_popup(f"Command Output for {device['hostname']}, {device['ip_addr']}", output, x_grid_size=10, y_grid_size=10) # Write output to a file. @@ -791,6 +791,7 @@ def deploy_button_back_process(self, command_text, device, usernames, passwords, """ # Create instance variables. exit_messages = {"info": "", "warning": "", "error": "", "critical": ""} + output = "" # Attempt to login to the given device. ssh_device = ssh_autodetect_info(usernames, passwords, enable_secrets, enable_telnet, force_telnet, device["ip_addr"]) @@ -804,7 +805,6 @@ def deploy_button_back_process(self, command_text, device, usernames, passwords, # Print log. self.logger.info(f"Connected to device {device['ip_addr']}. Running commands...") - output = "" # Run the commands on the switch and show output, then ask the user if the output looks good. for line in command_text.splitlines(): # Catch timeouts. @@ -892,6 +892,8 @@ def deploy_button_back_process(self, command_text, device, usernames, passwords, self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. Paramiko reported the socket as being closed. It is recommended that you rerun your commands on this switch!") exit_messages += "Couldn't get command output for {device['ip_addr']}. Paramiko reported the socket as being closed. It is recommended that you rerun your commands on this switch!\n" + return exit_messages, output + def mass_ping_button_callback(self) -> None: """ This function is triggered everytime the Mass Ping button is pressed. The process for this button click triggers @@ -1095,8 +1097,24 @@ def update_window(self) -> None: # Disable frames so user can't jack with the important controls. self.enable_children(self.options_frame) self.enable_children(self.creds_frame) - # Set toggle. + # Set deploy frame toggle. self.deploy_frames_state_enabled = True + # Switch all check and entry toggles to be the opposite of what they are. THis makes the current widgets disable. + # Vlan change entries. + if not self.change_vlan_check.get(): + self.vlan_entries_state_enabled = True + else: + self.vlan_entries_state_enabled = False + # DHCP entries and checkboxes. + if not self.dhcp_snooping_check.get(): + self.dhcp_snoop_vlan_entry_state_enabled = True + else: + self.dhcp_snoop_vlan_entry_state_enabled = False + # ARP entry. + if not self.arp_inspection_check.get(): + self.arp_inspection_vlan_entry_state_enabled = True + else: + self.arp_inspection_vlan_entry_state_enabled = False # Call main window event loop. self.window.update() From 9fd591dad226f48e93b7348c6d1a4fdee350a83f Mon Sep 17 00:00:00 2001 From: ClayJay3 Date: Fri, 23 Jun 2023 13:53:02 -0500 Subject: [PATCH 6/9] Fixed some warning logic and deploy logic bugs. --- interface/main_window.py | 136 +++++++++++++++++++++------------------ 1 file changed, 72 insertions(+), 64 deletions(-) diff --git a/interface/main_window.py b/interface/main_window.py index 6c6af96..59c6da8 100644 --- a/interface/main_window.py +++ b/interface/main_window.py @@ -691,18 +691,30 @@ def deploy_button_callback(self) -> None: # Check if selected devices list is empty. if len(self.selected_devices) > 0: # Check to make sure the user entered something for the vlan change if selected. - if self.change_vlan_check.get() and len(self.vlan_old_entry.get()) <= 0 and len(self.vlan_new_entry.get()) <= 0: + if len(self.command_text) <= 1: + # Print log and show messagebox to user. + self.logger.info("Command textbox is empty!") + messagebox.showwarning(title="Warning!", message="Command textbox is empty!") + # Set toggle to keep deploy from continuing. + self.user_result = False + elif self.change_vlan_check.get() and (len(self.vlan_old_entry.get()) <= 0 or len(self.vlan_new_entry.get()) <= 0): # Print log and show messagebox. self.logger.warning("Can't start deploy because no vlans have been entered to change! If you don't want to enter vlans, then uncheck the box in the options pane.") messagebox.showwarning(message="Can't start deploy because the user didn't specify old and new vlans even though the checkbox was ticked.") + # Set toggle to keep deploy from continuing. + self.user_result = False elif self.dhcp_snooping_check.get() and len(self.dhcp_snoop_vlan_entry.get()) <= 0: # Print log and show messagebox. self.logger.warning("Can't start deploy because no vlans have been entered to snoop on!") messagebox.showwarning(message="Can't start deploy because no vlans have been entered to snoop on! You must enter vlans like: 1,2-3,4") - if self.arp_inspection_check.get() and len(self.arp_inspection_vlan_entry.get()) <= 0: - # Print log and show messagebox. - self.logger.warning("Can't start deploy because no vlans have been entered to do arp inspection on!") - messagebox.showwarning(message="Can't start deploy because no vlans have been entered to do arp inspection on! You must enter vlans like: 1,2-3,4") + # Set toggle to keep deploy from continuing. + self.user_result = False + elif self.arp_inspection_check.get() and len(self.arp_inspection_vlan_entry.get()) <= 0: + # Print log and show messagebox. + self.logger.warning("Can't start deploy because no vlans have been entered to do arp inspection on!") + messagebox.showwarning(message="Can't start deploy because no vlans have been entered to do arp inspection on! You must enter vlans like: 1,2-3,4") + # Set toggle to keep deploy from continuing. + self.user_result = False else: # Before starting threads, ask user if they are sure they want to continue. self.logger.info("Asking user if they are sure they want to start the deploy process...") @@ -718,69 +730,65 @@ def deploy_button_callback(self) -> None: # Print log and show messagebox. self.logger.warning("Can't start deploy because no devices have been selected.") messagebox.showwarning(message="Can't start deploy because no devices have been selected.") + # Set toggle to keep deploy from continuing. + self.user_result = False # Check user is sure they want to start deploy. if self.user_result: - # Check if commands are empty. - if len(self.command_text) > 1: - if not self.already_deploying: - # Toggle that deploy has been started and print log. - self.already_deploying = True - self.logger.info("Command deploy has been started!") - # Set window size. Take it out of fullscreen. - self.window.wm_state('iconic') - # Print log. - self.logger.info(f"NetCommander will be running these commands on the selected switches: \n{self.command_text}") - - # Get the next up switch. - device = self.selected_devices.pop(0) - # Start a new thread that connects to each ip and runs the given commands. - exit_messages, output = self.deploy_button_back_process(self.command_text, device, self.usernames, self.passwords, self.enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get()) - - print(exit_messages) - - # Show the output to the user and ask if it is correct. - text_popup(f"Command Output for {device['hostname']}, {device['ip_addr']}", output, x_grid_size=10, y_grid_size=10) - # Write output to a file. - with open(f"{self.directory_name}/{device['hostname']}({device['ip_addr']}).txt", 'w+') as file: - for line in output: - file.write(line) - - # Check if turbo deploy is enabled. - if self.turbo_deploy_check.get(): - correct_output = True - continue_deploy = True - else: - # Ask the user if the output is correct. - correct_output = messagebox.askyesno(title=f"Confirm correct output for {device['hostname']}, {device['ip_addr']}", message="Is this output correct? Its output will be saved to the deploy_outputs folder.") - # Ask the user if they want to continue. - continue_deploy = messagebox.askyesno(title="Continue deploy?", message="Would you like to continue the command deploy?") - - # If the output was incorrect add the switch to a list. - if not correct_output: - self.bad_deploys.append(device) - # If the user doesn't want to continue the deploy then stop looping. - if not continue_deploy: - # Print log. - self.logger.info("The deploy has been canceled by the user.") - - # Check if the command deployment is done. - if len(self.selected_devices) <= 0 and self.already_deploying or not continue_deploy: - # Print bad deploy devices to logs. - self.logger.info(f"Unable to fully deploy commands to these devices: {self.bad_deploys}") - # Print log and show messagebox stating the deploy has finished. - self.logger.info(f"The command deploy has finished! {len(self.bad_deploys)} out of {self.deploy_devices_total_count} did not successfully execute the given commands. Opening window with the IPs now...") - messagebox.showinfo(message=f"The command deploy has finished! {len(self.bad_deploys)} out of {self.deploy_devices_total_count} did not successfully execute the given commands.") - # Check if we need to open the window. - if len(self.bad_deploys) > 0: - text_popup(title="Bad Deploy Devices", text=[f"{device['ip_addr']} - {device['hostname']}\n" for device in self.bad_deploys]) - - # Reset deploy toggle. - self.already_deploying = False + if not self.already_deploying: + # Toggle that deploy has been started and print log. + self.already_deploying = True + self.logger.info("Command deploy has been started!") + # Set window size. Take it out of fullscreen. + self.window.wm_state('iconic') + # Print log. + self.logger.info(f"NetCommander will be running these commands on the selected switches: \n{self.command_text}") + + # Get the next up switch. + device = self.selected_devices.pop(0) + # Start a new thread that connects to each ip and runs the given commands. + exit_messages, output = self.deploy_button_back_process(self.command_text, device, self.usernames, self.passwords, self.enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get()) + + print(exit_messages) + + # Show the output to the user and ask if it is correct. + text_popup(f"Command Output for {device['hostname']}, {device['ip_addr']}", output, x_grid_size=10, y_grid_size=10) + # Write output to a file. + with open(f"{self.directory_name}/{device['hostname']}({device['ip_addr']}).txt", 'w+') as file: + for line in output: + file.write(line) + + # Check if turbo deploy is enabled. + if self.turbo_deploy_check.get(): + correct_output = True + continue_deploy = True else: - # Print log and show messagebox to user. - self.logger.info("Command textbox is empty!") - messagebox.showwarning(title="Warning!", message="Command textbox is empty!") + # Ask the user if the output is correct. + correct_output = messagebox.askyesno(title=f"Confirm correct output for {device['hostname']}, {device['ip_addr']}", message="Is this output correct? Its output will be saved to the deploy_outputs folder.") + # Ask the user if they want to continue. + continue_deploy = messagebox.askyesno(title="Continue deploy?", message="Would you like to continue the command deploy?") + + # If the output was incorrect add the switch to a list. + if not correct_output: + self.bad_deploys.append(device) + # If the user doesn't want to continue the deploy then stop looping. + if not continue_deploy: + # Print log. + self.logger.info("The deploy has been canceled by the user.") + + # Check if the command deployment is done. + if len(self.selected_devices) <= 0 and self.already_deploying or not continue_deploy: + # Print bad deploy devices to logs. + self.logger.info(f"Unable to fully deploy commands to these devices: {self.bad_deploys}") + # Print log and show messagebox stating the deploy has finished. + self.logger.info(f"The command deploy has finished! {len(self.bad_deploys)} out of {self.deploy_devices_total_count} did not successfully execute the given commands. Opening window with the IPs now...") + messagebox.showinfo(message=f"The command deploy has finished! {len(self.bad_deploys)} out of {self.deploy_devices_total_count} did not successfully execute the given commands.") + # Check if we need to open the window. + if len(self.bad_deploys) > 0: + text_popup(title="Bad Deploy Devices", text=[f"{device['ip_addr']} - {device['hostname']}\n" for device in self.bad_deploys]) + + # Reset deploy toggle. + self.already_deploying = False else: # Print log. self.logger.info("Command deploy has been canceled.") From ba264c9a0a97787d9cd67771d08f67263ad79550 Mon Sep 17 00:00:00 2001 From: ClayJay3 Date: Fri, 23 Jun 2023 17:04:20 -0500 Subject: [PATCH 7/9] Threading fixes --- interface/main_window.py | 133 ++++++++++++++++++++++----------------- 1 file changed, 76 insertions(+), 57 deletions(-) diff --git a/interface/main_window.py b/interface/main_window.py index 59c6da8..b4a082b 100644 --- a/interface/main_window.py +++ b/interface/main_window.py @@ -58,6 +58,10 @@ def __init__(self) -> None: self.bad_deploys = [] self.deploy_devices_total_count = 0 self.deploy_frames_state_enabled = True + self.exit_messages = [] + self.switch_output = [] + self.deploy_thread = None + self.deploy_device = None # Open log file for displaying in console window. self.log_file = open("logs/latest.log", "r", encoding="utf-8") @@ -292,7 +296,7 @@ def initialize_window(self) -> None: self.dhcp_snoop_vlan_entry.insert(0, "1,2-4,5") self.disable_dhcp_snooping_option82_checkbox = tk.Checkbutton(master=dhcp_arp_options_frame, variable=self.dhcp_snooping_option82_check, onvalue=True, offvalue=False) self.disable_dhcp_snooping_option82_checkbox.grid(row=1, rowspan=1, column=0, columnspan=1, sticky=tk.E) - dhcp_option82_text = tk.Label(master=dhcp_arp_options_frame, text="Disable option82 injection. (Improves compatibility with DCHP servers)", font=(self.font, 10)) + dhcp_option82_text = tk.Label(master=dhcp_arp_options_frame, text="Disable option82 injection. (Improves compatibility with DHCP servers)", font=(self.font, 10)) dhcp_option82_text.grid(row=1, column=1, columnspan=4, sticky=tk.W) self.enable_arp_inspection_checkbox = tk.Checkbutton(master=dhcp_arp_options_frame, variable=self.arp_inspection_check, onvalue=True, offvalue=False) self.enable_arp_inspection_checkbox.grid(row=2, rowspan=1, column=0, columnspan=1, sticky=tk.E) @@ -657,6 +661,10 @@ def deploy_button_callback(self) -> None: self.user_result = False self.bad_deploys = [] self.deploy_devices_total_count = 0 + self.exit_messages = {"info": "", "warning": "", "error": "", "critical": ""} + self.switch_output = [] + self.deploy_thread = None + self.deploy_device = None # Create deploy output directory. self.directory_name = f"deploy_outputs/{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" @@ -744,63 +752,73 @@ def deploy_button_callback(self) -> None: # Print log. self.logger.info(f"NetCommander will be running these commands on the selected switches: \n{self.command_text}") - # Get the next up switch. - device = self.selected_devices.pop(0) - # Start a new thread that connects to each ip and runs the given commands. - exit_messages, output = self.deploy_button_back_process(self.command_text, device, self.usernames, self.passwords, self.enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get()) - - print(exit_messages) - - # Show the output to the user and ask if it is correct. - text_popup(f"Command Output for {device['hostname']}, {device['ip_addr']}", output, x_grid_size=10, y_grid_size=10) - # Write output to a file. - with open(f"{self.directory_name}/{device['hostname']}({device['ip_addr']}).txt", 'w+') as file: - for line in output: - file.write(line) - - # Check if turbo deploy is enabled. - if self.turbo_deploy_check.get(): - correct_output = True - continue_deploy = True - else: - # Ask the user if the output is correct. - correct_output = messagebox.askyesno(title=f"Confirm correct output for {device['hostname']}, {device['ip_addr']}", message="Is this output correct? Its output will be saved to the deploy_outputs folder.") - # Ask the user if they want to continue. - continue_deploy = messagebox.askyesno(title="Continue deploy?", message="Would you like to continue the command deploy?") - - # If the output was incorrect add the switch to a list. - if not correct_output: - self.bad_deploys.append(device) - # If the user doesn't want to continue the deploy then stop looping. - if not continue_deploy: - # Print log. - self.logger.info("The deploy has been canceled by the user.") - - # Check if the command deployment is done. - if len(self.selected_devices) <= 0 and self.already_deploying or not continue_deploy: - # Print bad deploy devices to logs. - self.logger.info(f"Unable to fully deploy commands to these devices: {self.bad_deploys}") - # Print log and show messagebox stating the deploy has finished. - self.logger.info(f"The command deploy has finished! {len(self.bad_deploys)} out of {self.deploy_devices_total_count} did not successfully execute the given commands. Opening window with the IPs now...") - messagebox.showinfo(message=f"The command deploy has finished! {len(self.bad_deploys)} out of {self.deploy_devices_total_count} did not successfully execute the given commands.") - # Check if we need to open the window. - if len(self.bad_deploys) > 0: - text_popup(title="Bad Deploy Devices", text=[f"{device['ip_addr']} - {device['hostname']}\n" for device in self.bad_deploys]) - - # Reset deploy toggle. - self.already_deploying = False + # Check if thread is finished. + if self.deploy_thread is None: + # Get the next up switch. + self.deploy_device = self.selected_devices.pop(0) + # Start a new thread that connects to each ip and runs the given commands. + self.deploy_thread = Thread(target=self.deploy_button_back_process, args=[self.command_text, self.deploy_device, self.usernames, self.passwords, self.enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get(), self.change_vlan_check.get(), self.vlan_old_entry.get(), self.vlan_new_entry.get(), self.toggle_voice_vlan_check.get(), self.dhcp_snooping_check.get(), self.dhcp_snoop_vlan_entry.get(), self.dhcp_snooping_option82_check.get(), self.arp_inspection_check.get(), self.arp_inspection_vlan_entry.get(), self.exit_messages, self.switch_output]) + self.deploy_thread.start() + elif not self.deploy_thread.is_alive(): + # Close thread. + self.deploy_thread.join() + + print(self.exit_messages) + print(self.switch_output[0]) + + # Show the output to the user and ask if it is correct. + text_popup(f"Command Output for {self.deploy_device['hostname']}, {self.deploy_device['ip_addr']}", self.switch_output[0], x_grid_size=10, y_grid_size=10) + # Write output to a file. + with open(f"{self.directory_name}/{self.deploy_device['hostname']}({self.deploy_device['ip_addr']}).txt", 'w+') as file: + for line in self.switch_output[0]: + file.write(line) + + # Check if turbo deploy is enabled. + if self.turbo_deploy_check.get(): + correct_output = True + continue_deploy = True + else: + # Ask the user if the output is correct. + correct_output = messagebox.askyesno(title=f"Confirm correct output for {self.deploy_device['hostname']}, {self.deploy_device['ip_addr']}", message="Is this output correct? Its output will be saved to the deploy_outputs folder.") + # Ask the user if they want to continue. + continue_deploy = messagebox.askyesno(title="Continue deploy?", message="Would you like to continue the command deploy?") + + # If the output was incorrect add the switch to a list. + if not correct_output: + self.bad_deploys.append(self.deploy_device) + # If the user doesn't want to continue the deploy then stop looping. + if not continue_deploy: + # Print log. + self.logger.info("The deploy has been canceled by the user.") + + # Check if the command deployment is done. + if len(self.selected_devices) <= 0 and self.already_deploying or not continue_deploy: + # Print bad deploy devices to logs. + self.logger.info(f"Unable to fully deploy commands to these devices: {self.bad_deploys}") + # Print log and show messagebox stating the deploy has finished. + self.logger.info(f"The command deploy has finished! {len(self.bad_deploys)} out of {self.deploy_devices_total_count} did not successfully execute the given commands. Opening window with the IPs now...") + messagebox.showinfo(message=f"The command deploy has finished! {len(self.bad_deploys)} out of {self.deploy_devices_total_count} did not successfully execute the given commands.") + # Check if we need to open the window. + if len(self.bad_deploys) > 0: + text_popup(title="Bad Deploy Devices", text=[f"{self.deploy_device['ip_addr']} - {self.deploy_device['hostname']}\n" for self.deploy_device in self.bad_deploys]) + + # Reset deploy toggle. + self.already_deploying = False + else: + # Get the next up switch. + self.deploy_device = self.selected_devices.pop(0) + # Start a new thread that connects to each ip and runs the given commands. + self.deploy_thread = Thread(target=self.deploy_button_back_process, args=[self.command_text, self.deploy_device, self.usernames, self.passwords, self.enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get(), self.change_vlan_check.get(), self.vlan_old_entry.get(), self.vlan_new_entry.get(), self.toggle_voice_vlan_check.get(), self.dhcp_snooping_check.get(), self.dhcp_snoop_vlan_entry.get(), self.dhcp_snooping_option82_check.get(), self.arp_inspection_check.get(), self.arp_inspection_vlan_entry.get(), self.exit_messages, self.switch_output]).start() else: # Print log. self.logger.info("Command deploy has been canceled.") - def deploy_button_back_process(self, command_text, device, usernames, passwords, enable_secrets, enable_telnet, force_telnet) -> None: + def deploy_button_back_process(self, command_text, device, usernames, passwords, enable_secrets, enable_telnet, force_telnet, change_vlan_check, vlan_old_entry, vlan_new_entry, toggle_voice_vlan_check, dhcp_snooping_check, dhcp_snoop_vlan_entry, dhcp_snooping_option82_check, arp_inspection_check, arp_inspection_vlan_entry, exit_messages, switch_output) -> None: """ Helper function for Deploy Button, deploys commands to a single device in a new thread. """ # Create instance variables. - exit_messages = {"info": "", "warning": "", "error": "", "critical": ""} output = "" - # Attempt to login to the given device. ssh_device = ssh_autodetect_info(usernames, passwords, enable_secrets, enable_telnet, force_telnet, device["ip_addr"]) connection = ssh_telnet(ssh_device, enable_telnet, force_telnet, store_config_info=True) @@ -827,9 +845,9 @@ def deploy_button_back_process(self, command_text, device, usernames, passwords, self.logger.error(f"Netmiko ERROR: {e}") # Check if the user has enabled vlan changing. - if self.change_vlan_check.get(): + if change_vlan_check: # Get vlan new and old numbers from the user. - vlan_command_text = change_port_vlans(self.vlan_old_entry.get(), self.vlan_new_entry.get(), self.toggle_voice_vlan_check.get(), ssh_device) + vlan_command_text = change_port_vlans(vlan_old_entry, vlan_new_entry, toggle_voice_vlan_check, ssh_device) # Run the commands on the switch and show output, then ask the user if the output looks good. output += "\n\n" @@ -847,9 +865,9 @@ def deploy_button_back_process(self, command_text, device, usernames, passwords, output += "\n\n" # Check if the user has enabled DHCP snooping option. - if self.dhcp_snooping_check.get(): + if dhcp_snooping_check: # Get vlan new and old numbers from the user. - vlan_command_text = setup_dhcp_snooping_on_trunks(ssh_device, self.dhcp_snoop_vlan_entry.get(), self.dhcp_snooping_option82_check.get()) + vlan_command_text = setup_dhcp_snooping_on_trunks(ssh_device, dhcp_snoop_vlan_entry, dhcp_snooping_option82_check) # Run the commands on the switch and show output, then ask the user if the output looks good. output += "\n\n" @@ -867,9 +885,9 @@ def deploy_button_back_process(self, command_text, device, usernames, passwords, output += "\n\n" # Check if the user has enabled ARP inspection option. - if self.arp_inspection_check.get(): + if arp_inspection_check: # Get vlan new and old numbers from the user. - vlan_command_text = setup_dynamic_arp_inspection_on_trunks(ssh_device, self.arp_inspection_vlan_entry.get()) + vlan_command_text = setup_dynamic_arp_inspection_on_trunks(ssh_device, arp_inspection_vlan_entry) # Run the commands on the switch and show output, then ask the user if the output looks good. output += "\n\n" @@ -900,7 +918,8 @@ def deploy_button_back_process(self, command_text, device, usernames, passwords, self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. Paramiko reported the socket as being closed. It is recommended that you rerun your commands on this switch!") exit_messages += "Couldn't get command output for {device['ip_addr']}. Paramiko reported the socket as being closed. It is recommended that you rerun your commands on this switch!\n" - return exit_messages, output + # Append compiled output to passed in variable. + switch_output.append(output) def mass_ping_button_callback(self) -> None: """ @@ -1102,7 +1121,7 @@ def update_window(self) -> None: # If so, call the deploy callback function to handle threads and deploy logic. self.deploy_button_callback() elif not self.deploy_frames_state_enabled: - # Disable frames so user can't jack with the important controls. + # Reenabled frames. self.enable_children(self.options_frame) self.enable_children(self.creds_frame) # Set deploy frame toggle. From 3e69eebb82b42e2a25bfc67e2b62b344b006b837 Mon Sep 17 00:00:00 2001 From: ClayJay3 Date: Mon, 26 Jun 2023 14:55:25 -0500 Subject: [PATCH 8/9] Deploy threading works, fixed some logical bugs. --- interface/main_window.py | 80 +++++++++++++++++++++++++++++----------- 1 file changed, 58 insertions(+), 22 deletions(-) diff --git a/interface/main_window.py b/interface/main_window.py index b4a082b..99fca67 100644 --- a/interface/main_window.py +++ b/interface/main_window.py @@ -58,7 +58,7 @@ def __init__(self) -> None: self.bad_deploys = [] self.deploy_devices_total_count = 0 self.deploy_frames_state_enabled = True - self.exit_messages = [] + self.exit_messages = {} self.switch_output = [] self.deploy_thread = None self.deploy_device = None @@ -661,7 +661,7 @@ def deploy_button_callback(self) -> None: self.user_result = False self.bad_deploys = [] self.deploy_devices_total_count = 0 - self.exit_messages = {"info": "", "warning": "", "error": "", "critical": ""} + self.exit_messages = {"info": [], "warning": [], "error": [], "critical": []} self.switch_output = [] self.deploy_thread = None self.deploy_device = None @@ -756,6 +756,9 @@ def deploy_button_callback(self) -> None: if self.deploy_thread is None: # Get the next up switch. self.deploy_device = self.selected_devices.pop(0) + # Reset output variables. + self.exit_messages = {"info": [], "warning": [], "error": [], "critical": []} + self.switch_output = [] # Start a new thread that connects to each ip and runs the given commands. self.deploy_thread = Thread(target=self.deploy_button_back_process, args=[self.command_text, self.deploy_device, self.usernames, self.passwords, self.enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get(), self.change_vlan_check.get(), self.vlan_old_entry.get(), self.vlan_new_entry.get(), self.toggle_voice_vlan_check.get(), self.dhcp_snooping_check.get(), self.dhcp_snoop_vlan_entry.get(), self.dhcp_snooping_option82_check.get(), self.arp_inspection_check.get(), self.arp_inspection_vlan_entry.get(), self.exit_messages, self.switch_output]) self.deploy_thread.start() @@ -764,22 +767,57 @@ def deploy_button_callback(self) -> None: self.deploy_thread.join() print(self.exit_messages) - print(self.switch_output[0]) - # Show the output to the user and ask if it is correct. - text_popup(f"Command Output for {self.deploy_device['hostname']}, {self.deploy_device['ip_addr']}", self.switch_output[0], x_grid_size=10, y_grid_size=10) - # Write output to a file. - with open(f"{self.directory_name}/{self.deploy_device['hostname']}({self.deploy_device['ip_addr']}).txt", 'w+') as file: - for line in self.switch_output[0]: - file.write(line) + # Check if we got any error messages. + show_output_box = True + if any(len(val[1]) > 0 for val in self.exit_messages.items()): + # Loop through keys in exit message dictionary. + for key in self.exit_messages.keys(): + # Check message type and show messagebox to user. + if key == "info": + # Show message to user. + for info_message in self.exit_messages[key]: + # Show info to user. + messagebox.showinfo(title="Info", message=info_message) + if key == "warning": + # Show message to user. + for warning_message in self.exit_messages[key]: + # Show info to user. + messagebox.showwarning(title="Warning", message=warning_message) + if key == "error": + # Show message to user. + for error_message in self.exit_messages[key]: + # Show info to user. + messagebox.showerror(title="Error", message=error_message) + # Set toggle to not show switch output. + show_output_box = False + if key == "critical": + # Show message to user. + for critical_message in self.exit_messages[key]: + # Show info to user. + messagebox.showcritical(title="CRITICAL", message=critical_message) + # Set toggle to not show switch output. + show_output_box = False + + # Check if we should still show output text and get confirmation from user. + if show_output_box: + # Show the output to the user and ask if it is correct. + text_popup(f"Command Output for {self.deploy_device['hostname']}, {self.deploy_device['ip_addr']}", self.switch_output[0], x_grid_size=10, y_grid_size=10) + # Write output to a file. + with open(f"{self.directory_name}/{self.deploy_device['hostname']}({self.deploy_device['ip_addr']}).txt", 'w+') as file: + for line in self.switch_output[0]: + file.write(line) # Check if turbo deploy is enabled. if self.turbo_deploy_check.get(): correct_output = True continue_deploy = True else: - # Ask the user if the output is correct. - correct_output = messagebox.askyesno(title=f"Confirm correct output for {self.deploy_device['hostname']}, {self.deploy_device['ip_addr']}", message="Is this output correct? Its output will be saved to the deploy_outputs folder.") + # Check if we should still show oyutput text and get confimation from user. + if show_output_box: + # Ask the user if the output is correct. + correct_output = messagebox.askyesno(title=f"Confirm correct output for {self.deploy_device['hostname']}, {self.deploy_device['ip_addr']}", message="Is this output correct? Its output will be saved to the deploy_outputs folder.") + # Ask the user if they want to continue. continue_deploy = messagebox.askyesno(title="Continue deploy?", message="Would you like to continue the command deploy?") @@ -805,10 +843,8 @@ def deploy_button_callback(self) -> None: # Reset deploy toggle. self.already_deploying = False else: - # Get the next up switch. - self.deploy_device = self.selected_devices.pop(0) - # Start a new thread that connects to each ip and runs the given commands. - self.deploy_thread = Thread(target=self.deploy_button_back_process, args=[self.command_text, self.deploy_device, self.usernames, self.passwords, self.enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get(), self.change_vlan_check.get(), self.vlan_old_entry.get(), self.vlan_new_entry.get(), self.toggle_voice_vlan_check.get(), self.dhcp_snooping_check.get(), self.dhcp_snoop_vlan_entry.get(), self.dhcp_snooping_option82_check.get(), self.arp_inspection_check.get(), self.arp_inspection_vlan_entry.get(), self.exit_messages, self.switch_output]).start() + # Set deploy thread to None. + self.deploy_thread = None else: # Print log. self.logger.info("Command deploy has been canceled.") @@ -840,7 +876,7 @@ def deploy_button_back_process(self, command_text, device, usernames, passwords, output += connection.send_command(line, expect_string="#") except ReadTimeout: self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.") - exit_messages["warning"] += f"Couldn't get command output for {device['ip_addr']} running command {line}. However, it is likely the command still ran and the console just took too long to print output.\n" + exit_messages["warning"].append(f"Couldn't get command output for {device['ip_addr']} running command {line}. However, it is likely the command still ran and the console just took too long to print output.") except Exception as e: self.logger.error(f"Netmiko ERROR: {e}") @@ -859,7 +895,7 @@ def deploy_button_back_process(self, command_text, device, usernames, passwords, output += connection.send_command(line, expect_string="#") except ReadTimeout: self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.") - exit_messages["warning"] += f"Couldn't get command output for {device['ip_addr']} running command {line}. However, it is likely the command still ran and the console just took too long to print output.\n" + exit_messages["warning"].append(f"Couldn't get command output for {device['ip_addr']} running command {line}. However, it is likely the command still ran and the console just took too long to print output.") # Append some newlines to the output to keep it pretty. output += "\n\n" @@ -879,7 +915,7 @@ def deploy_button_back_process(self, command_text, device, usernames, passwords, output += connection.send_command(line, expect_string="#") except ReadTimeout: self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.") - exit_messages["warning"] += f"Couldn't get command output for {device['ip_addr']} running command {line}. However, it is likely the command still ran and the console just took too long to print output.\n" + exit_messages["warning"].append(f"Couldn't get command output for {device['ip_addr']} running command {line}. However, it is likely the command still ran and the console just took too long to print output.") # Append some newlines to the output to keep it pretty. output += "\n\n" @@ -899,24 +935,24 @@ def deploy_button_back_process(self, command_text, device, usernames, passwords, output += connection.send_command(line, expect_string="#") except ReadTimeout: self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.") - exit_messages["warning"] += f"Couldn't get command output for {device['ip_addr']} running command {line}. However, it is likely the command still ran and the console just took too long to print output.\n" + exit_messages["warning"].append(f"Couldn't get command output for {device['ip_addr']} running command {line}. However, it is likely the command still ran and the console just took too long to print output.") # Append some newlines to the output to keep it pretty. output += "\n\n" else: # Print log and show messagebox. self.logger.error(f"Insignificant privilege level to safely run all commands on {device['ip_addr']}. Skipping and adding to bad deploy list...") - exit_messages[error] += f"Insignificant privilege level to safely run all commands on {device['ip_addr']}. The device will be skipped and marked as a bad deploy.\n" + exit_messages["error"].append(f"Insignificant privilege level to safely run all commands on {device['ip_addr']}. The device will be skipped and marked as a bad deploy.") # Disconnection from device. connection.disconnect() else: # Print log and show messagebox. self.logger.error(f"Failed to connection to {device['ip_addr']}") - exit_messages[error] += "Couldn't connect to {device['ip_addr']}. Moving on to next device.\n" + exit_messages["error"].append(f"Couldn't connect to {device['ip_addr']}. Moving on to next device.") except OSError: self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. Paramiko reported the socket as being closed. It is recommended that you rerun your commands on this switch!") - exit_messages += "Couldn't get command output for {device['ip_addr']}. Paramiko reported the socket as being closed. It is recommended that you rerun your commands on this switch!\n" + exit_messages["warning"].append("Couldn't get command output for {device['ip_addr']}. Paramiko reported the socket as being closed. It is recommended that you rerun your commands on this switch!") # Append compiled output to passed in variable. switch_output.append(output) From cd902c1d9b070daca26f390e6d7504520115012e Mon Sep 17 00:00:00 2001 From: ClayJay3 Date: Tue, 27 Jun 2023 09:32:29 -0500 Subject: [PATCH 9/9] Set min windows size and changed vlan logic. --- interface/main_window.py | 4 ++-- utils/deploy_options.py | 13 ++++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/interface/main_window.py b/interface/main_window.py index 99fca67..29f0e82 100644 --- a/interface/main_window.py +++ b/interface/main_window.py @@ -120,6 +120,8 @@ def initialize_window(self) -> None: self.window.attributes("-topmost", True) self.window.update() self.window.attributes("-topmost", False) + # Set minimum size of window. + self.window.minsize(1068, 831) # Create checkbox variables. self.enable_telnet_check = tk.BooleanVar(self.window) @@ -766,8 +768,6 @@ def deploy_button_callback(self) -> None: # Close thread. self.deploy_thread.join() - print(self.exit_messages) - # Check if we got any error messages. show_output_box = True if any(len(val[1]) > 0 for val in self.exit_messages.items()): diff --git a/utils/deploy_options.py b/utils/deploy_options.py index 752284d..55c62f1 100644 --- a/utils/deploy_options.py +++ b/utils/deploy_options.py @@ -32,9 +32,15 @@ def change_port_vlans(old_vlan, new_vlan, toggle_voice_vlans, ssh_device): for interface in ssh_device["interfaces"]: # Catch key errors for malformed interface output. try: - # Check the interface vlan. - if int(interface["switchport access vlan"]) == int(old_vlan) and interface["vlan_status"] == old_vlan and not interface["switchport mode trunk"] and ("Ap" not in interface["name"]) and "trunk" not in interface["vlan_status"]: - interface_vlan_change_list.append(interface["name"]) + # Check if we are changing voice VLAN ports. + if toggle_voice_vlans: + # Check the voice vlan. + if int(interface["switchport voice vlan"]) == int(old_vlan) and not interface["switchport mode trunk"] and ("Ap" not in interface["name"]) and "trunk" not in interface["vlan_status"]: + interface_vlan_change_list.append(interface["name"]) + else: + # Check the interface vlan. + if int(interface["switchport access vlan"]) == int(old_vlan) and interface["vlan_status"] == old_vlan and not interface["switchport mode trunk"] and ("Ap" not in interface["name"]) and "trunk" not in interface["vlan_status"]: + interface_vlan_change_list.append(interface["name"]) except KeyError as error: logger.error(f"KeyError ({error}): An interface output for {ssh_device['hostname']} was not received properly, skipping...") @@ -102,6 +108,7 @@ def change_port_vlans(old_vlan, new_vlan, toggle_voice_vlans, ssh_device): command_text += f"sw acc vlan {new_vlan}\n" # Add end to exit global config mode. command_text += "end\n" + command_text += "show int status\n" return command_text