Skip to content

Commit

Permalink
Finished deploy logic, you can now use this program to modify switches.
Browse files Browse the repository at this point in the history
  • Loading branch information
ClayJay3 committed Aug 29, 2022
1 parent 42dc7ff commit 0a9c5b0
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 108 deletions.
187 changes: 165 additions & 22 deletions interface/main_window.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
from cgi import test
from cgitb import enable
import logging
import os
import sys
from turtle import back, width
import webbrowser
import tkinter as tk
from threading import Thread
from tkinter import messagebox
from tkinter import font

from netmiko.exceptions import ReadTimeout
from interface.popup_window import text_popup
from pyvis.network import Network

from utils.open_connection import ssh_autodetect_info
from utils.open_connection import ssh_autodetect_info, ssh_telnet

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.net_crawl import cdp_auto_discover, clear_discoveries
Expand Down Expand Up @@ -40,13 +38,15 @@ def __init__(self) -> None:
self.ip_list = []
self.discovery_list = []
self.export_info_list = []
self.selection_devices_list = []
self.already_auto_discovering = False
self.export_permission_error = False
self.enable_telnet_check = None
self.force_telnet_check = None

# Create UI components.
self.window = None
self.command_textbox = None
self.tree_view_frame = None
self.tree_canvas = None
self.closest_hop_entry = None
Expand Down Expand Up @@ -146,20 +146,22 @@ def initialize_window(self) -> None:
closest_hop_label.grid(row=2, rowspan=1, column=0, columnspan=1, sticky=tk.EW)
self.closest_hop_entry = tk.Entry(master=device_list_frame, width=10)
self.closest_hop_entry.grid(row=2, rowspan=1, column=1, columnspan=1, sticky=tk.EW)
button_deploy = tk.Button(master=device_list_frame, text="DEPLOY", foreground="red", background="white", command=self.deploy_button_callback)
button_deploy.grid(row=5, rowspan=1, column=0, columnspan=2, sticky=tk.EW)
button_ping = tk.Button(master=device_list_frame, text="Ping Check", foreground="black", background="white", command=self.mass_ping_button_callback)
button_ping.grid(row=9, rowspan=1, column=0, sticky=tk.EW)
button_discover = tk.Button(master=device_list_frame, text="Find Switches", foreground="black", background="white", command=self.auto_discover_switches_callback)
button_discover.grid(row=9, rowspan=1, column=1, sticky=tk.EW)
discover_warning = tk.Label(master=device_list_frame, text="Deploy Commands: (Use caution! Incorrect configuration can really suck!)")
discover_warning.grid(row=0, rowspan=1, column=4, columnspan=6, sticky=tk.W)
command_textbox = tk.Text(master=device_list_frame, width=10, height=5, wrap="none")
command_textbox.grid(row=1, rowspan=8, column=4, columnspan=6, sticky=tk.NSEW)
scroll_y=tk.Scrollbar(master=device_list_frame, orient='vertical', command=command_textbox.yview) # Add a scrollbar.
self.command_textbox = tk.Text(master=device_list_frame, width=10, height=5, wrap="none")
self.command_textbox.grid(row=1, rowspan=8, column=4, columnspan=6, sticky=tk.NSEW)
scroll_y=tk.Scrollbar(master=device_list_frame, orient='vertical', command=self.command_textbox.yview) # Add a scrollbar.
scroll_y.grid(row=1, rowspan=8, column=10, columnspan=1, sticky=tk.NS)
command_textbox['yscrollcommand'] = scroll_y.set
scroll_x=tk.Scrollbar(master=device_list_frame, orient='horizontal', command=command_textbox.xview) # Add a scrollbar.
self.command_textbox['yscrollcommand'] = scroll_y.set
scroll_x=tk.Scrollbar(master=device_list_frame, orient='horizontal', command=self.command_textbox.xview) # Add a scrollbar.
scroll_x.grid(row=9, rowspan=1, column=4, columnspan=6, sticky="new")
command_textbox['xscrollcommand'] = scroll_x.set
self.command_textbox['xscrollcommand'] = scroll_x.set

# Populate tree view frame.
# Create a scrollbar and canvas to store everything in. This is the only way to have a horizontal scrollbar for the whole frame.
Expand Down Expand Up @@ -309,6 +311,9 @@ def auto_discover_switches_callback(self) -> None:
if not self.already_auto_discovering:
# Clear data lists from discover module.
clear_discoveries()
self.switch_tree_checkboxes.clear()
self.switch_tree_check_values.clear()
self.selection_devices_list.clear()
# Get username and password lists.
usernames = [username.get() for username in self.username_entrys]
passwords = [password.get() for password in self.password_entrys]
Expand Down Expand Up @@ -543,6 +548,139 @@ def auto_discover_back_process(self, text, usernames, passwords, enable_secrets,
# Reset safety toggle.
self.already_auto_discovering = False

def deploy_button_callback(self) -> None:
"""
This function is triggered everytime the Deploy button is pressed. The process for this button click triggers
a new thread that send the given commands to all selected devices.
Parameters:
-----------
None
Returns:
--------
Nothing
"""
# Loop through each boolean value and match it to the device ip.
selected_devices = []
for selected, device in zip(self.switch_tree_check_values, self.selection_devices_list):
# Get the selected devices status and append them to a list if true.
if selected.get():
# Insert object at beginning of list. This effectively reverses the list, so we can just go through is from the beginning when deploying commands.
selected_devices.insert(0, device)

# Get username and password lists.
usernames = [username.get() for username in self.username_entrys]
passwords = [password.get() for password in self.password_entrys]
enable_secrets = [secret.get() for secret in self.secret_entrys]
# Remove empty passwords.
for i, password in enumerate(passwords):
# Check length.
if len(password) <= 0:
# Remove list item.
usernames.pop(i)
passwords.pop(i)
enable_secrets.pop(i)

# Get commands from textbox.
text = self.command_textbox.get('1.0', tk.END)

# Check if selected devices list is empty.
if len(selected_devices) > 0:
# Before starting threads, ask user if they are sure they want to continue.
self.logger.info("Asking user if they are sure they want to start the deploy process...")
user_result = messagebox.askyesno(title="ATTENTION!", message="Are you sure you want to start the deploy process? This will make changes to the devices!")
# Check user choice.
if user_result:
# Print log.
self.logger.info("Command deploy has been started!")
# Start a new thread that connects to each ip and runs the given commands.
# Thread(target=self.deploy_button_back_process, args=(text, usernames, passwords, enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get())).start()
bad_deploys = self.deploy_button_back_process(text, selected_devices, usernames, passwords, enable_secrets, self.enable_telnet_check.get(), self.force_telnet_check.get())

# Print log and show messagebox stating the deploy has finished.
self.logger.info(f"The command deploy has finished! {len(bad_deploys)} out of {len(selected_devices)} did not successfully execute the given commands. Opening window with the IPs now...")
messagebox.showinfo(message=f"The command deploy has finished! {len(bad_deploys)} out of {len(selected_devices)} did not successfully execute the given commands.")
# Check if we need to open the window.
if len(bad_deploys) > 0:
text_popup(title="Bad Deploy Devices", text=[f"{device['ip_addr']} - {device['hostname']}\n" for device in bad_deploys])
else:
# Print log.
self.logger.info("Command deploy has been canceled.")
else:
# Print log and show messagebox.
self.logger.warning("Can't start deploy because no devices have been selected.")
messagebox.showwarning(message="Can't start deploy because no devices have been selected.")

def deploy_button_back_process(self, command_text, devices, usernames, passwords, enable_secrets, enable_telnet, force_telnet) -> None:
"""
Helper function for Deploy Button, runs in a new thread.
"""
# Create instance variables.
bad_deploys = []

# Loop through all selected devices.
for device in devices:
# Attempt to login to the first device.
ssh_device = ssh_autodetect_info(usernames, passwords, enable_secrets, enable_telnet, force_telnet, device["ip_addr"])
connection = ssh_telnet(ssh_device, enable_telnet, force_telnet, store_config_info=False)

# Check if device connection was successful.
if connection is not None and connection.is_alive():
# Check if commands are empty.
if len(command_text) > 1:
# Print log.
self.logger.info(f"Connected to device {device['ip_addr']}. Running commands...")

output = ""
# Run the commands on the switch and show output, then ask the user if the output looks good.
for line in command_text.splitlines():
# Catch timeouts.
try:
# Send the current command to the switch.
output += connection.send_command(line, expect_string="#")
except ReadTimeout:
self.logger.warning(f"Couldn't get command output for {device['ip_addr']}. It is likely the commands still ran.")
messagebox.showwarning(message=f"Couldn't get command output for {device['ip_addr']}. However, it is likely the commands still ran and the console just took too long to print output.")

# Ask the user if the output is correct.
correct_output = messagebox.askyesno(title=f"Confirm correct output for {device['hostname']}, {device['ip_addr']}", message="Is this output correct?") #### CRASHES HERE.
# Ask the user if they want to continue.
continue_deploy = messagebox.askyesno(title="Continue deploy?", message="Would you like to continue the command deploy?")
# Show the output to the user and ask if it is correct.
text_popup(f"Command Output for {device['hostname']}, {device['ip_addr']}", output, x_grid_size=10, y_grid_size=10)

# If the output was incorrect add the switch to a list.
if not correct_output:
bad_deploys.append(device)
# If the user doesn't want to continue the deploy then stop looping.
if not continue_deploy:
# Print log.
self.logger.info("The deploy has been canceled by the user.")
# Exit for loop.
break
else:
# Print log and show messagebox to user.
self.logger.info("Command textbox is empty!")
messagebox.showwarning(message="Command textbox is empty!")
# Store all device in the bad deploy list.
bad_deploys = devices
# Disconnection from the current device.
connection.disconnect()
# Exit for loop.
break

# Disconnection from device.
connection.disconnect()
else:
# Print log and show messagebox.
self.logger.error(f"Failed to connection to {device['ip_addr']}")
messagebox.showerror(message=f"Couldn't connect to {device['ip_addr']}. Moving on to next device.")
# Append current device to bad deploy list.
bad_deploys.append(device)

return bad_deploys

def mass_ping_button_callback(self) -> None:
"""
This function is triggered everytime the Mass Ping button is pressed. The process for this button click triggers
Expand All @@ -556,20 +694,24 @@ def mass_ping_button_callback(self) -> None:
--------
Nothing
"""
# Create instance variables.
text = []

# Print status to console.
self.logger.info("\n---------------------------------------------------------\nPinging all devices now...\n---------------------------------------------------------")

# Get text from textbox.
text = [self.closest_hop_entry.get()]
# Ping each switch listed in the textbox to get a list containing their status.
# Thread(target=ping_of_death, args=(text, self.ip_list,)).start()

test = []
for selected, device in zip(self.switch_tree_check_values, self.export_info_list):
if selected.get():
test.append(device)
# Check if scan has been ran.
if len(self.switch_tree_check_values) > 0 and any([value.get() for value in self.switch_tree_check_values]):
# Loop through each boolean value and match it to the device ip.
for selected, device in zip(self.switch_tree_check_values, self.selection_devices_list):
if selected.get():
text.append(device["ip_addr"])
else:
# Get text from textbox.
text = [self.closest_hop_entry.get()]

print(test)
# Ping each switch listed in the textbox to get a list containing their status.
Thread(target=ping_of_death, args=(text, self.ip_list,)).start()

def update_window(self) -> None:
"""
Expand Down Expand Up @@ -643,9 +785,10 @@ def update_window(self) -> None:
# Append to list.
self.switch_tree_check_values.append(check_value)
self.switch_tree_checkboxes.append(checkbox)
self.selection_devices_list.append(device)
# Add to textbox.
text_box.window_create("1.0", window=checkbox)
text_box.insert("end", "\n")
text_box.insert("end", " \n")

# Add scrollbar to textbox.
scrolly=tk.Scrollbar(master=self.tree_view_frame, orient='vertical', command=text_box.yview) # Add a scrollbar.
Expand Down
86 changes: 0 additions & 86 deletions test.py

This file was deleted.

0 comments on commit 0a9c5b0

Please sign in to comment.