diff --git a/interface/main_window.py b/interface/main_window.py index 6bf7804..128894b 100644 --- a/interface/main_window.py +++ b/interface/main_window.py @@ -1,18 +1,16 @@ -from cgi import test -from cgitb import enable import logging import os import sys -from turtle import back, width import webbrowser import tkinter as tk from threading import Thread from tkinter import messagebox from tkinter import font - +from netmiko.exceptions import ReadTimeout +from interface.popup_window import text_popup from pyvis.network import Network -from utils.open_connection import ssh_autodetect_info +from utils.open_connection import ssh_autodetect_info, ssh_telnet sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.net_crawl import cdp_auto_discover, clear_discoveries @@ -40,6 +38,7 @@ def __init__(self) -> None: self.ip_list = [] self.discovery_list = [] self.export_info_list = [] + self.selection_devices_list = [] self.already_auto_discovering = False self.export_permission_error = False self.enable_telnet_check = None @@ -47,6 +46,7 @@ def __init__(self) -> None: # Create UI components. self.window = None + self.command_textbox = None self.tree_view_frame = None self.tree_canvas = None self.closest_hop_entry = None @@ -146,20 +146,22 @@ def initialize_window(self) -> None: closest_hop_label.grid(row=2, rowspan=1, column=0, columnspan=1, sticky=tk.EW) self.closest_hop_entry = tk.Entry(master=device_list_frame, width=10) self.closest_hop_entry.grid(row=2, rowspan=1, column=1, columnspan=1, sticky=tk.EW) + button_deploy = tk.Button(master=device_list_frame, text="DEPLOY", foreground="red", background="white", command=self.deploy_button_callback) + button_deploy.grid(row=5, rowspan=1, column=0, columnspan=2, sticky=tk.EW) button_ping = tk.Button(master=device_list_frame, text="Ping Check", foreground="black", background="white", command=self.mass_ping_button_callback) button_ping.grid(row=9, rowspan=1, column=0, sticky=tk.EW) button_discover = tk.Button(master=device_list_frame, text="Find Switches", foreground="black", background="white", command=self.auto_discover_switches_callback) button_discover.grid(row=9, rowspan=1, column=1, sticky=tk.EW) discover_warning = tk.Label(master=device_list_frame, text="Deploy Commands: (Use caution! Incorrect configuration can really suck!)") discover_warning.grid(row=0, rowspan=1, column=4, columnspan=6, sticky=tk.W) - command_textbox = tk.Text(master=device_list_frame, width=10, height=5, wrap="none") - command_textbox.grid(row=1, rowspan=8, column=4, columnspan=6, sticky=tk.NSEW) - scroll_y=tk.Scrollbar(master=device_list_frame, orient='vertical', command=command_textbox.yview) # Add a scrollbar. + self.command_textbox = tk.Text(master=device_list_frame, width=10, height=5, wrap="none") + self.command_textbox.grid(row=1, rowspan=8, column=4, columnspan=6, sticky=tk.NSEW) + scroll_y=tk.Scrollbar(master=device_list_frame, orient='vertical', command=self.command_textbox.yview) # Add a scrollbar. scroll_y.grid(row=1, rowspan=8, column=10, columnspan=1, sticky=tk.NS) - command_textbox['yscrollcommand'] = scroll_y.set - scroll_x=tk.Scrollbar(master=device_list_frame, orient='horizontal', command=command_textbox.xview) # Add a scrollbar. + self.command_textbox['yscrollcommand'] = scroll_y.set + scroll_x=tk.Scrollbar(master=device_list_frame, orient='horizontal', command=self.command_textbox.xview) # Add a scrollbar. scroll_x.grid(row=9, rowspan=1, column=4, columnspan=6, sticky="new") - command_textbox['xscrollcommand'] = scroll_x.set + self.command_textbox['xscrollcommand'] = scroll_x.set # Populate tree view frame. # Create a scrollbar and canvas to store everything in. This is the only way to have a horizontal scrollbar for the whole frame. @@ -309,6 +311,9 @@ def auto_discover_switches_callback(self) -> None: if not self.already_auto_discovering: # Clear data lists from discover module. clear_discoveries() + self.switch_tree_checkboxes.clear() + self.switch_tree_check_values.clear() + self.selection_devices_list.clear() # Get username and password lists. usernames = [username.get() for username in self.username_entrys] passwords = [password.get() for password in self.password_entrys] @@ -543,6 +548,139 @@ def auto_discover_back_process(self, text, usernames, passwords, enable_secrets, # Reset safety toggle. self.already_auto_discovering = False + def deploy_button_callback(self) -> None: + """ + This function is triggered everytime the Deploy button is pressed. The process for this button click triggers + a new thread that send the given commands to all selected devices. + + Parameters: + ----------- + None + + Returns: + -------- + Nothing + """ + # Loop through each boolean value and match it to the device ip. + selected_devices = [] + for selected, device in zip(self.switch_tree_check_values, self.selection_devices_list): + # Get the selected devices status and append them to a list if true. + if selected.get(): + # Insert object at beginning of list. This effectively reverses the list, so we can just go through is from the beginning when deploying commands. + selected_devices.insert(0, device) + + # Get username and password lists. + usernames = [username.get() for username in self.username_entrys] + passwords = [password.get() for password in self.password_entrys] + enable_secrets = [secret.get() for secret in self.secret_entrys] + # Remove empty passwords. + for i, password in enumerate(passwords): + # Check length. + if len(password) <= 0: + # Remove list item. + usernames.pop(i) + passwords.pop(i) + enable_secrets.pop(i) + + # Get commands from textbox. + text = self.command_textbox.get('1.0', tk.END) + + # Check if selected devices list is empty. + if len(selected_devices) > 0: + # Before starting threads, ask user if they are sure they want to continue. + self.logger.info("Asking user if they are sure they want to start the deploy process...") + user_result = messagebox.askyesno(title="ATTENTION!", message="Are you sure you want to start the deploy process? This will make changes to the devices!") + # Check user choice. + if user_result: + # Print log. + self.logger.info("Command deploy has been started!") + # Start a new thread that connects to each ip and runs the given commands. + # Thread(target=self.deploy_button_back_process, args=(text, usernames, passwords, enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get())).start() + bad_deploys = self.deploy_button_back_process(text, selected_devices, usernames, passwords, enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get()) + + # Print log and show messagebox stating the deploy has finished. + self.logger.info(f"The command deploy has finished! {len(bad_deploys)} out of {len(selected_devices)} did not successfully execute the given commands. Opening window with the IPs now...") + messagebox.showinfo(message=f"The command deploy has finished! {len(bad_deploys)} out of {len(selected_devices)} did not successfully execute the given commands.") + # Check if we need to open the window. + if len(bad_deploys) > 0: + text_popup(title="Bad Deploy Devices", text=[f"{device['ip_addr']} - {device['hostname']}\n" for device in bad_deploys]) + else: + # Print log. + self.logger.info("Command deploy has been canceled.") + else: + # Print log and show messagebox. + self.logger.warning("Can't start deploy because no devices have been selected.") + messagebox.showwarning(message="Can't start deploy because no devices have been selected.") + + def deploy_button_back_process(self, command_text, devices, usernames, passwords, enable_secrets, enable_telnet, force_telnet) -> None: + """ + Helper function for Deploy Button, runs in a new thread. + """ + # Create instance variables. + bad_deploys = [] + + # Loop through all selected devices. + for device in devices: + # Attempt to login to the first device. + ssh_device = ssh_autodetect_info(usernames, passwords, enable_secrets, enable_telnet, force_telnet, device["ip_addr"]) + connection = ssh_telnet(ssh_device, enable_telnet, force_telnet, store_config_info=False) + + # Check if device connection was successful. + if connection is not None and connection.is_alive(): + # Check if commands are empty. + if len(command_text) > 1: + # Print log. + self.logger.info(f"Connected to device {device['ip_addr']}. Running commands...") + + output = "" + # Run the commands on the switch and show output, then ask the user if the output looks good. + for line in command_text.splitlines(): + # Catch timeouts. + try: + # Send the current command to the switch. + output += connection.send_command(line, expect_string="#") + except ReadTimeout: + self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.") + messagebox.showwarning(message=f"Couldn't get command output for {device['ip_addr']}. However, it is likely the commands still ran and the console just took too long to print output.") + + # Ask the user if the output is correct. + correct_output = messagebox.askyesno(title=f"Confirm correct output for {device['hostname']}, {device['ip_addr']}", message="Is this output correct?") #### CRASHES HERE. + # Ask the user if they want to continue. + continue_deploy = messagebox.askyesno(title="Continue deploy?", message="Would you like to continue the command deploy?") + # Show the output to the user and ask if it is correct. + text_popup(f"Command Output for {device['hostname']}, {device['ip_addr']}", output, x_grid_size=10, y_grid_size=10) + + # If the output was incorrect add the switch to a list. + if not correct_output: + bad_deploys.append(device) + # If the user doesn't want to continue the deploy then stop looping. + if not continue_deploy: + # Print log. + self.logger.info("The deploy has been canceled by the user.") + # Exit for loop. + break + else: + # Print log and show messagebox to user. + self.logger.info("Command textbox is empty!") + messagebox.showwarning(message="Command textbox is empty!") + # Store all device in the bad deploy list. + bad_deploys = devices + # Disconnection from the current device. + connection.disconnect() + # Exit for loop. + break + + # Disconnection from device. + connection.disconnect() + else: + # Print log and show messagebox. + self.logger.error(f"Failed to connection to {device['ip_addr']}") + messagebox.showerror(message=f"Couldn't connect to {device['ip_addr']}. Moving on to next device.") + # Append current device to bad deploy list. + bad_deploys.append(device) + + return bad_deploys + def mass_ping_button_callback(self) -> None: """ This function is triggered everytime the Mass Ping button is pressed. The process for this button click triggers @@ -556,20 +694,24 @@ def mass_ping_button_callback(self) -> None: -------- Nothing """ + # Create instance variables. + text = [] + # Print status to console. self.logger.info("\n---------------------------------------------------------\nPinging all devices now...\n---------------------------------------------------------") - # Get text from textbox. - text = [self.closest_hop_entry.get()] - # Ping each switch listed in the textbox to get a list containing their status. - # Thread(target=ping_of_death, args=(text, self.ip_list,)).start() - - test = [] - for selected, device in zip(self.switch_tree_check_values, self.export_info_list): - if selected.get(): - test.append(device) + # Check if scan has been ran. + if len(self.switch_tree_check_values) > 0 and any([value.get() for value in self.switch_tree_check_values]): + # Loop through each boolean value and match it to the device ip. + for selected, device in zip(self.switch_tree_check_values, self.selection_devices_list): + if selected.get(): + text.append(device["ip_addr"]) + else: + # Get text from textbox. + text = [self.closest_hop_entry.get()] - print(test) + # Ping each switch listed in the textbox to get a list containing their status. + Thread(target=ping_of_death, args=(text, self.ip_list,)).start() def update_window(self) -> None: """ @@ -643,9 +785,10 @@ def update_window(self) -> None: # Append to list. self.switch_tree_check_values.append(check_value) self.switch_tree_checkboxes.append(checkbox) + self.selection_devices_list.append(device) # Add to textbox. text_box.window_create("1.0", window=checkbox) - text_box.insert("end", "\n") + text_box.insert("end", " \n") # Add scrollbar to textbox. scrolly=tk.Scrollbar(master=self.tree_view_frame, orient='vertical', command=text_box.yview) # Add a scrollbar. diff --git a/test.py b/test.py deleted file mode 100644 index 60c31b2..0000000 --- a/test.py +++ /dev/null @@ -1,86 +0,0 @@ -from tkinter import * - -from tkinter import ttk - - - -root = Tk() - -root.title('Full Window Scrolling X Y Scrollbar Example') - -root.geometry("1350x400") - - - -# Create A Main frame - -main_frame = Frame(root) - -main_frame.pack(fill=BOTH,expand=1) - - - -# Create Frame for X Scrollbar - -sec = Frame(main_frame) - -sec.pack(fill=X,side=BOTTOM) - - - -# Create A Canvas - -my_canvas = Canvas(main_frame) - -my_canvas.pack(side=LEFT,fill=BOTH,expand=1) - - - -# Add A Scrollbars to Canvas - -x_scrollbar = ttk.Scrollbar(sec,orient=HORIZONTAL,command=my_canvas.xview) - -x_scrollbar.pack(side=BOTTOM,fill=X) - -y_scrollbar = ttk.Scrollbar(main_frame,orient=VERTICAL,command=my_canvas.yview) -y_scrollbar.pack(side=RIGHT,fill=Y) - - - -# Configure the canvas - -my_canvas.configure(xscrollcommand=x_scrollbar.set) - -my_canvas.configure(yscrollcommand=y_scrollbar.set) - -my_canvas.bind("",lambda e: my_canvas.config(scrollregion= my_canvas.bbox(ALL))) - - - -# Create Another Frame INSIDE the Canvas - -second_frame = Frame(my_canvas) - - - -# Add that New Frame a Window In The Canvas - -my_canvas.create_window((0,0),window= second_frame, anchor="nw") - - - - - -for thing in range(100): - - Button(second_frame ,text=f"Button {thing}").grid(row=5,column=thing,pady=10,padx=10) - - - -for thing in range(100): - - Button(second_frame ,text=f"Button {thing}").grid(row=thing,column=5,pady=10,padx=10) - - - -root.mainloop() \ No newline at end of file