Skip to content

Commit

Permalink
Merge pull request #1 from ClayJay3/temp/vlan-change
Browse files Browse the repository at this point in the history
Temp/vlan change Merge
  • Loading branch information
ClayJay3 authored Oct 24, 2022
2 parents 6faa7f3 + 189bcfc commit 21c3e47
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 85 deletions.
223 changes: 163 additions & 60 deletions interface/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self) -> None:
self.already_auto_discovering = False
self.export_permission_error = False
self.enable_telnet_check = None
self.change_vlan_check = None
self.force_telnet_check = None

# Create UI components.
Expand All @@ -50,6 +51,9 @@ def __init__(self) -> None:
self.tree_view_frame = None
self.tree_canvas = None
self.closest_hop_entry = None
self.vlan_old_entry = None
self.vlan_new_entry = None
self.vlan_entries_state_enabled = True

# Open log file for displaying in console window.
self.log_file = open("logs/latest.log", "r", encoding="utf-8")
Expand Down Expand Up @@ -89,6 +93,7 @@ def initialize_window(self) -> None:
self.window.attributes("-topmost", False)
# Create checkbox variables.
self.enable_telnet_check = tk.BooleanVar(self.window)
self.change_vlan_check = tk.BooleanVar(self.window)
self.force_telnet_check = tk.BooleanVar(self.window)

# Setup window grid layout.
Expand Down Expand Up @@ -212,6 +217,16 @@ def initialize_window(self) -> None:
# Populate option frame.
secret_label = tk.Label(master=options_frame, text="Options:", font=(self.font, 14))
secret_label.grid(row=0, column=0, columnspan=10, sticky=tk.NSEW)
enable_vlan_change_checkbox = tk.Checkbutton(master=options_frame, variable=self.change_vlan_check, onvalue=True, offvalue=False)
enable_vlan_change_checkbox.grid(row=1, rowspan=1, column=0, columnspan=1, sticky=tk.E)
vlan_change_text = tk.Label(master=options_frame, text="Change VLAN", font=(self.font, 10))
vlan_change_text.grid(row=1, column=1, columnspan=1, sticky=tk.W)
self.vlan_old_entry = tk.Entry(master=options_frame, width=10, validate="all", validatecommand=(options_frame.register(lambda input:True if str.isdigit(input) or input == "" else False), "%P"))
self.vlan_old_entry.grid(row=1, column=2, sticky=tk.W)
vlan_change_text = tk.Label(master=options_frame, text=" access ports to VLAN", font=(self.font, 10))
vlan_change_text.grid(row=1, column=3, columnspan=1, sticky=tk.W)
self.vlan_new_entry = tk.Entry(master=options_frame, width=10, validate="all", validatecommand=(options_frame.register(lambda input:True if (str.isdigit(input) or input == "") else False), "%P"))
self.vlan_new_entry.grid(row=1, column=4, sticky=tk.W)

# Populate console frame.
self.list = tk.Listbox(master=console_frame, background="black", foreground="green", highlightcolor="green")
Expand Down Expand Up @@ -561,6 +576,9 @@ def deploy_button_callback(self) -> None:
--------
Nothing
"""
# Create deploy output directory.
os.makedirs("deploy_outputs", exist_ok=True)

# Loop through each boolean value and match it to the device ip.
selected_devices = []
for selected, device in zip(self.switch_tree_check_values, self.selection_devices_list):
Expand All @@ -586,27 +604,33 @@ def deploy_button_callback(self) -> None:
text = self.command_textbox.get('1.0', tk.END)

# Check if selected devices list is empty.
if len(selected_devices) > 0:
# Before starting threads, ask user if they are sure they want to continue.
self.logger.info("Asking user if they are sure they want to start the deploy process...")
user_result = messagebox.askyesno(title="ATTENTION!", message="Are you sure you want to start the deploy process? This will make changes to the devices!")
# Check user choice.
if user_result:
# Print log.
self.logger.info("Command deploy has been started!")
# Start a new thread that connects to each ip and runs the given commands.
# Thread(target=self.deploy_button_back_process, args=(text, usernames, passwords, enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get())).start()
bad_deploys = self.deploy_button_back_process(text, selected_devices, usernames, passwords, enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get())

# Print log and show messagebox stating the deploy has finished.
self.logger.info(f"The command deploy has finished! {len(bad_deploys)} out of {len(selected_devices)} did not successfully execute the given commands. Opening window with the IPs now...")
messagebox.showinfo(message=f"The command deploy has finished! {len(bad_deploys)} out of {len(selected_devices)} did not successfully execute the given commands.")
# Check if we need to open the window.
if len(bad_deploys) > 0:
text_popup(title="Bad Deploy Devices", text=[f"{device['ip_addr']} - {device['hostname']}\n" for device in bad_deploys])
if len(selected_devices) > 0:
# Check to make sure the user entered something for the vlan change if selected.
if self.change_vlan_check.get() and len(self.vlan_old_entry.get()) <= 0 and len(self.vlan_new_entry.get()) <= 0:
# Print log and show messagebox.
self.logger.warning("Can't start deploy because no vlans have been entered to change! If you don't want to enter vlans, then uncheck the box in the options pane.")
messagebox.showwarning(message="Can't start deploy because the user didn't specify old and new vlans even though the checkbox was ticked.")
else:
# Print log.
self.logger.info("Command deploy has been canceled.")
# Before starting threads, ask user if they are sure they want to continue.
self.logger.info("Asking user if they are sure they want to start the deploy process...")
user_result = messagebox.askyesno(title="ATTENTION!", message="Are you sure you want to start the deploy process? This will make changes to the devices!")
# Check user choice.
if user_result:
# Print log.
self.logger.info("Command deploy has been started!")
# Start a new thread that connects to each ip and runs the given commands.
# Thread(target=self.deploy_button_back_process, args=(text, usernames, passwords, enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get())).start()
bad_deploys = self.deploy_button_back_process(text, selected_devices, usernames, passwords, enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get())

# Print log and show messagebox stating the deploy has finished.
self.logger.info(f"The command deploy has finished! {len(bad_deploys)} out of {len(selected_devices)} did not successfully execute the given commands. Opening window with the IPs now...")
messagebox.showinfo(message=f"The command deploy has finished! {len(bad_deploys)} out of {len(selected_devices)} did not successfully execute the given commands.")
# Check if we need to open the window.
if len(bad_deploys) > 0:
text_popup(title="Bad Deploy Devices", text=[f"{device['ip_addr']} - {device['hostname']}\n" for device in bad_deploys])
else:
# Print log.
self.logger.info("Command deploy has been canceled.")
else:
# Print log and show messagebox.
self.logger.warning("Can't start deploy because no devices have been selected.")
Expand All @@ -622,57 +646,119 @@ def deploy_button_back_process(self, command_text, devices, usernames, passwords
# Set window size. Take it out of fullscreen.
self.window.wm_state('iconic')

# Print log.
self.logger.info(f"NetCommander will be running these commands on the selected switches: \n{command_text}")

# Loop through all selected devices.
for device in devices:
# Attempt to login to the first device.
ssh_device = ssh_autodetect_info(usernames, passwords, enable_secrets, enable_telnet, force_telnet, device["ip_addr"])
connection = ssh_telnet(ssh_device, enable_telnet, force_telnet, store_config_info=False)
connection = ssh_telnet(ssh_device, enable_telnet, force_telnet, store_config_info=True)

# Check if device connection was successful.
if connection is not None and connection.is_alive():
# Check if commands are empty.
if len(command_text) > 1:
# Print log.
self.logger.info(f"Connected to device {device['ip_addr']}. Running commands...")

output = ""
# Run the commands on the switch and show output, then ask the user if the output looks good.
for line in command_text.splitlines():
# Catch timeouts.
try:
# Send the current command to the switch.
output += connection.send_command(line, expect_string="#")
except ReadTimeout:
self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.")
messagebox.showwarning(message=f"Couldn't get command output for {device['ip_addr']}. However, it is likely the commands still ran and the console just took too long to print output.")

# Show the output to the user and ask if it is correct.
text_popup(f"Command Output for {device['hostname']}, {device['ip_addr']}", output, x_grid_size=10, y_grid_size=10)
# Ask the user if the output is correct.
correct_output = messagebox.askyesno(title=f"Confirm correct output for {device['hostname']}, {device['ip_addr']}", message="Is this output correct?")
# Ask the user if they want to continue.
continue_deploy = messagebox.askyesno(title="Continue deploy?", message="Would you like to continue the command deploy?")

# If the output was incorrect add the switch to a list.
if not correct_output:
bad_deploys.append(device)
# If the user doesn't want to continue the deploy then stop looping.
if not continue_deploy:
# Check the privilege level of our connection. Must be 15 to execute all commands.
if "15" in connection.send_command("show privilege"):
# Check if commands are empty.
if len(command_text) > 1:
# Print log.
self.logger.info("The deploy has been canceled by the user.")
self.logger.info(f"Connected to device {device['ip_addr']}. Running commands...")

output = ""
# Run the commands on the switch and show output, then ask the user if the output looks good.
for line in command_text.splitlines():
# Catch timeouts.
try:
# Send the current command to the switch.
output += f"\n{connection.find_prompt()}{line}\n"
output += connection.send_command(line, expect_string="#")
except ReadTimeout:
self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.")
messagebox.showwarning(message=f"Couldn't get command output for {device['ip_addr']}. However, it is likely the commands still ran and the console just took too long to print output.")

# Check if the user have enabled vlan changing.
if self.change_vlan_check.get():
# Get vlan new and old numbers from the user.
old_vlan = self.vlan_old_entry.get()
# Change vlan 1 to 0.
if int(old_vlan) <= 1:
old_vlan = 0
new_vlan = self.vlan_new_entry.get()

# Loop through interfaces and get a list of interfaces with interfaces on VLAN1.
interface_vlan_change_list = []
for interface in ssh_device["interfaces"]:
# Catch key errors for malformed interface output.
try:
# Check the interface vlan.
if interface["switchport access vlan"] == old_vlan and not interface["switchport mode trunk"] and ("Fa" not in interface["name"] and "Po" not in interface["name"] and "Ap" not in interface["name"]) and "trunk" not in interface["vlan_status"]:
interface_vlan_change_list.append(interface["name"])
except KeyError:
self.logger.error(f"KeyError: An interface output for {device['hostname']} was not received properly, skipping...")

# Check that we have at least 1 interface to change.
vlan_command_text = ""
if len(interface_vlan_change_list) > 0:
# Append interfaces changed to output
output += f"\n\n{interface_vlan_change_list}"
# Build commands list for vlan change.
vlan_command_text += "end\nconf t\nint range "
for interface in interface_vlan_change_list:
vlan_command_text += interface + ", "
vlan_command_text = vlan_command_text[:-2] + "\n"
vlan_command_text += f"sw acc vlan {new_vlan}\nend\n"

# Run the commands on the switch and show output, then ask the user if the output looks good.
output += "\n\n"
for line in vlan_command_text.splitlines():
# Catch timeouts.
try:
# Send the current command to the switch.
output += f"\n{connection.find_prompt()}{line}\n"
output += connection.send_command(line, expect_string="#")
except ReadTimeout:
self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.")
messagebox.showwarning(message=f"Couldn't get command output for {device['ip_addr']}. However, it is likely the commands still ran and the console just took too long to print output.")

# Show the output to the user and ask if it is correct.
text_popup(f"Command Output for {device['hostname']}, {device['ip_addr']}", output, x_grid_size=10, y_grid_size=10)
# Write output to a file.
with open(f"deploy_outputs/{device['hostname']}({device['ip_addr']}).txt", 'w+') as file:
for line in output:
file.write(line)
# Ask the user if the output is correct.
correct_output = messagebox.askyesno(title=f"Confirm correct output for {device['hostname']}, {device['ip_addr']}", message="Is this output correct? Its output will be saved to the deploy_outputs folder.")
# Ask the user if they want to continue.
continue_deploy = messagebox.askyesno(title="Continue deploy?", message="Would you like to continue the command deploy?")

# If the output was incorrect add the switch to a list.
if not correct_output:
bad_deploys.append(device)
# If the user doesn't want to continue the deploy then stop looping.
if not continue_deploy:
# Print log.
self.logger.info("The deploy has been canceled by the user.")
# Disconnection from the current device.
connection.disconnect()
# Exit for loop.
break
else:
# Print log and show messagebox to user.
self.logger.info("Command textbox is empty!")
messagebox.showwarning(message="Command textbox is empty!")
# Store all device in the bad deploy list.
bad_deploys = devices
# Disconnection from the current device.
connection.disconnect()
# Exit for loop.
break
else:
# Print log and show messagebox to user.
self.logger.info("Command textbox is empty!")
messagebox.showwarning(message="Command textbox is empty!")
# Store all device in the bad deploy list.
bad_deploys = devices
# Disconnection from the current device.
connection.disconnect()
# Exit for loop.
break

# Print log and show messagebox.
self.logger.error(f"Insignificant privilege level to safely run all commands on {device['ip_addr']}. Skipping and adding to bad deploy list...")
messagebox.showerror(message=f"Insignificant privilege level to safely run all commands on {device['ip_addr']}. The device will be skipped and marked as a bad deploy.")
# Append current device to bad deploy list.
bad_deploys.append(device)

# Disconnection from device.
connection.disconnect()
else:
Expand All @@ -685,6 +771,9 @@ def deploy_button_back_process(self, command_text, devices, usernames, passwords
# Update main window.
self.window.update()

# Print bad deploy devices to logs.
self.logger.info(f"Unable to fully deploy commands to these devices: {bad_deploys}")

return bad_deploys

def mass_ping_button_callback(self) -> None:
Expand Down Expand Up @@ -739,6 +828,20 @@ def update_window(self) -> None:
else:
self.list.insert(0, line)

# Update options entries and checkboxes.
if not self.change_vlan_check.get() and self.vlan_entries_state_enabled:
# Disable elements.
self.vlan_old_entry.configure(state="disable")
self.vlan_new_entry.configure(state="disable")
# Update toggle var.
self.vlan_entries_state_enabled = False
elif self.change_vlan_check.get() and not self.vlan_entries_state_enabled:
# Disable elements.
self.vlan_old_entry.configure(state="normal")
self.vlan_new_entry.configure(state="normal")
# Update toggle var.
self.vlan_entries_state_enabled = True

# Update the textbox with the discovering list if it's not empty and not being updated.
if len(self.discovery_list) > 0 and not self.already_auto_discovering:
# Delete the current contents of the frame.
Expand Down
Loading

0 comments on commit 21c3e47

Please sign in to comment.