Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]Prettify organizer via cmd2 #265

Merged
merged 2 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion federatedscope/organizer/cfg_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ---------------------------------------------------------------------- #
# Lobby related (global variable stored in Redis)
# ---------------------------------------------------------------------- #
server_ip = '172.23.27.236'
server_ip = '172.17.138.149'
broker_url = f'redis://{server_ip}:6379/0'
result_backend = f'redis://{server_ip}/0'

Expand Down
181 changes: 129 additions & 52 deletions federatedscope/organizer/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import cmd
import cmd2
import time
from celery import Celery
from cmd2 import Bg, Fg, style

from federatedscope.core.configs.config import CN
from federatedscope.organizer.cfg_client import server_ip
Expand All @@ -11,55 +12,105 @@
organizer.config_from_object('cfg_client')


class OrganizerClient(cmd.Cmd):
intro = 'Welcome to the FS organizer shell. Type help or ? to list ' \
'commands.\n'
prompt = 'FederatedScope>> '
class OrganizerClient(cmd2.Cmd):
SEVER_CATEGORY = 'Server Related Commands'
ECS_CATEGORY = 'ECS Related Commands'
TASK_CATEGORY = 'Task Related Commands'

# Maintained several dict
ecs_dict, room_dict, task_dict = {}, {}, {}
ecs_dict, room_dict = {}, {}
timeout = 10

def __init__(self):
super().__init__(
multiline_commands=['echo'],
include_ipy=True,
)

self.intro = style(
'Welcome to the FS organizer shell. Type help or ? to list '
'commands.\n',
fg=Fg.BLUE,
bg=Bg.WHITE,
bold=True)
self.prompt = 'FederatedScope>> '
self.self_in_py = True
self.default_category = 'Built-in Commands'
self.debug = True
self.foreground_color = Fg.CYAN.name.lower()

def fancy_output(self, out):
return self.poutput(style(out, fg=Fg.GREEN, bg=Bg.WHITE))

# ---------------------------------------------------------------------- #
# SSH Manager related
# ---------------------------------------------------------------------- #
@cmd2.with_category(ECS_CATEGORY)
def do_add_ecs(self, line):
'Add Ecs (ip, user, psw): add_ecs 172.X.X.X root 123'
'Usage: add_ecs ip user psw\n\n' \
'Add ECS to client control list\n\n' \
'required arguments:\n' \
' ip, ip address 172.X.X.X\n' \
' user, user name of ECS\n' \
' psw, password of user\n\n' \
'Example:\n' \
' add_ecs 172.X.X.X root 12345\n'
try:
ip, user, psw = line.split(' ')
key = f"{ip}"
if key in self.ecs_dict:
raise ValueError(f"ECS `{key}` already exists.")
self.ecs_dict[key] = SSHManager(ip, user, psw)
print(f"{self.ecs_dict[key]} added.")
self.fancy_output(f"{self.ecs_dict[key]} added.")
except Exception as error:
print(f"Exception: {error}")
self.pexcept(f"Exception: {error}")

@cmd2.with_category(ECS_CATEGORY)
def do_del_ecs(self, line):
'Delete Ecs (ip): del_ecs 172.X.X.X'
'Usage: del_ecs ip\n\n' \
'Delete ECS from client control list\n\n' \
'required arguments:\n' \
' ip, ip address 172.X.X.X\n\n' \
'Example:\n' \
' del_ecs 172.X.X.X\n'
try:
key = line
print(f"Delete {key}: {self.ecs_dict[key]}.")
self.fancy_output(f"Delete {key}: {self.ecs_dict[key]}.")
# TODO: Del all task
del self.ecs_dict[key]
except Exception as error:
print(f"Exception: {error}")
self.pexcept(f"Exception: {error}")

@cmd2.with_category(ECS_CATEGORY)
def do_display_ecs(self, line):
'Display all saved ECS: display_ecs'
'Usage: display_ecs' \
'Display saved ECS in client control list\n\n' \
'Example:\n' \
' display_ecs\n'
try:
info = ""
for key, value in self.ecs_dict.items():
info += f"ecs: {key}, info: {value}\n"
print(info)
self.fancy_output(info)
except Exception as error:
print(f"Exception: {error}")
self.pexcept(f"Exception: {error}")

@cmd2.with_category(ECS_CATEGORY)
def do_join_room(self, line):
'Let an ECS join a specific room (ip room_id other_opts): ' \
'join_room 172.X.X.X 0 device 0 distribute.data_idx 2 ...'
'Usage: join_room ip room_id other_opts\n\n' \
'Let an ECS join a specific room\n\n' \
'required arguments:\n' \
' ip, ip address 172.X.X.X\n' \
' room_id, room id joining \n' \
' other_opts, other operations in FS\n\n' \
'Example:\n' \
' join_room 172.X.X.X 0 device 0 distribute.data_idx 2 ...\n'
try:
line = line.split(' ')
ip, room_id, opts = line[0], line[1], line[2:]
ecs, room = self.ecs_dict[ip], self.room_dict[room_id]
if room['cfg'] == '******':
raise ValueError('Please view_room before joining.')
cfg = CN(room['cfg'])

# Convert necessary configurations
Expand All @@ -75,103 +126,129 @@ def do_join_room(self, line):
value = f'{i}'.replace(' ', '')
command += f' "{value}"'
command = command[1:]
# TODO: manage the process
stdout, stderr = ecs.exec_python_cmd(f'nohup python '
f'federatedscope/main.py'
f' {command} > /dev/null '
f'2>&1 &')
print(stdout, stderr)
pid = ecs.launch_task(command)
self.fancy_output(f'{ecs.ip}({pid}) launched,')
except Exception as error:
print(f"Exception: {error}")
self.pexcept(f"Exception: {error}")

# ---------------------------------------------------------------------- #
# Task manager related
# ---------------------------------------------------------------------- #
@cmd2.with_category(TASK_CATEGORY)
def do_display_task(self, line):
'Usage: display_task' \
'Display all running tasks in client task list\n\n' \
'Example:\n' \
' display_task\n'
# TODO: add abort, check status, etc
print(self.task_dict)
for i in self.ecs_dict:
self.fancy_output(f'{self.ecs_dict[i].ip}:'
f' {self.ecs_dict[i].tasks}')

# ---------------------------------------------------------------------- #
# Server related messages
# ---------------------------------------------------------------------- #
@cmd2.with_category(SEVER_CATEGORY)
def do_create_room(self, line):
'Create FS room in server with specific command (command, psw): ' \
'create_room --cfg ../../federatedscope/example_configs' \
'/distributed_femnist_server.yaml 123'
'Usage: create_room command psw\n\n' \
'Create FS room in server with specific command\n\n' \
'required arguments:\n' \
' command, extra command to launch FS\n' \
' psw, password for room \n\n' \
'Example:\n' \
' create_room --cfg ../../federatedscope/example_configs' \
'/distributed_femnist_server.yaml 12345\n'
try:
global organizer
psw = line.split(' ')[-1]
command = line[:-len(psw) - 1]
result = organizer.send_task('server.create_room', [command, psw])
cnt = 0
while (not result.ready()) and cnt < self.timeout:
print('Waiting for response... (will re-try in 1s)')
self.fancy_output(
'Waiting for response... (will re-try in 1s)')
time.sleep(1)
cnt += 1
print(result.get(timeout=1))
self.fancy_output(result.get(timeout=1))
except Exception as error:
print(f"Exception: {error}")
self.pexcept(f"Exception: {error}")

@cmd2.with_category(SEVER_CATEGORY)
def do_update_room(self, line):
'Fetch all FS rooms from Lobby: update_room'
'Usage: update_room' \
'Fetch all FS rooms from Lobby (will forget all saved room)\n\n' \
'Example:\n' \
' update_room\n'
try:
global organizer
print('Forget all saved room due to `update_room`.')
self.fancy_output('Forget all saved room due to `update_room`.')
result = organizer.send_task('server.display_room')
cnt = 0
while (not result.ready()) and cnt < self.timeout:
print('Waiting for response... (will re-try in 1s)')
self.fancy_output(
'Waiting for response... (will re-try in 1s)')
time.sleep(1)
cnt += 1
self.room_dict = result.get(timeout=1)
info = ""
for key, value in self.room_dict.items():
tmp = f"room_id: {key}, info: {value}\n"
info += tmp
print(info)
self.fancy_output(info)
except Exception as error:
print(f"Exception: {error}")
self.pexcept(f"Exception: {error}")

@cmd2.with_category(SEVER_CATEGORY)
def do_view_room(self, line):
'View specific FS room (room_id, psw, verbose): view_room 0 123 0\n' \
'verbose 0: print no information\n' \
'verbose 1: print information of a specific room\n' \
'verbose 2: print information of all the rooms'
'Usage: view_room room_id psw verbose\n\n' \
'View specific FS room\n\n' \
'required arguments:\n' \
' room_id, extra command to launch FS\n' \
' psw, password for room \n' \
' verbose,\n' \
' 0: print no information\n' \
' 1: print information of a specific room\n' \
' 2: print information of all the rooms\n\n' \
'Example:\n' \
' view_room 0 12345 0\n'
try:
global organizer
room_id, psw, verbose = line.split(' ')
result = organizer.send_task('server.view_room', [room_id, psw])
cnt = 0
while (not result.ready()) and cnt < self.timeout:
print('Waiting for response... (will re-try in 1s)')
self.fancy_output(
'Waiting for response... (will re-try in 1s)')
time.sleep(1)
cnt += 1
info = result.get(timeout=1)
if isinstance(info, dict):
self.room_dict[room_id] = info
print(f'Room {room_id} has been updated to joinable.')
self.fancy_output(
f'Room {room_id} has been updated to joinable.')
if verbose == '1':
print(info)
self.fancy_output(info)
elif verbose == '2':
print(self.room_dict)
self.fancy_output(self.room_dict)
else:
print(info)
self.fancy_output(info)
except Exception as error:
print(f"Exception: {error}")
self.pexcept(f"Exception: {error}")

@cmd2.with_category(SEVER_CATEGORY)
def do_shut_down(self, line):
'Shut down all rooms and quit: shut_down'
'Usage: shut_down' \
'Shut down all rooms and quit\n\n' \
'Example:\n' \
' shut_down\n'
global organizer
result = organizer.send_task('server.shut_down')
cnt = 0
while (not result.ready()) and cnt < self.timeout:
print('Waiting for response... (will re-try in 1s)')
self.fancy_output('Waiting for response... (will re-try in 1s)')
time.sleep(1)
cnt += 1
print(result.get(timeout=1))
return True

def do_quit(self, line):
self.fancy_output(result.get(timeout=1))
return True


Expand Down
31 changes: 23 additions & 8 deletions federatedscope/organizer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def __init__(self, ip, user, psw, ssh_port=22):
self.ssh_port = ssh_port
self.ssh, self.trans = None, None
self.setup_fs()
self.tasks = set()

def _connect(self):
self.trans = paramiko.Transport((self.ip, self.ssh_port))
Expand All @@ -31,7 +32,7 @@ def _connect(self):
def _disconnect(self):
self.trans.close()

def exec_cmd(self, command):
def _exec_cmd(self, command):
if self.trans is None or self.ssh is None:
self._connect()
command = f'source ~/.bashrc; cd ~; {command}'
Expand All @@ -40,7 +41,7 @@ def exec_cmd(self, command):
stderr = stderr.read().decode('ascii').strip("\n")
return stdout, stderr

def exec_python_cmd(self, command):
def _exec_python_cmd(self, command):
if self.trans is None or self.ssh is None:
self._connect()
command = f'source ~/.bashrc; conda activate {env_name}; ' \
Expand All @@ -55,16 +56,16 @@ def _check_conda(self):
Check and install conda env.
"""
# Check conda
conda, _ = self.exec_cmd('which conda')
conda, _ = self._exec_cmd('which conda')
if conda is None:
logger.exception('No conda, please install conda first.')
# TODO: Install conda here
return False

# Check env & FS
output, err = self.exec_cmd(f'conda activate {env_name}; '
f'python -c "import federatedscope; print('
f'federatedscope.__version__)"')
output, err = self._exec_cmd(f'conda activate {env_name}; '
f'python -c "import federatedscope; '
f'print(federatedscope.__version__)"')
if err:
logger.error(err)
# TODO: Install FS env here
Expand All @@ -80,22 +81,36 @@ def _check_source(self):
Check and download FS repo.
"""
fs_path = os.path.join(root_path, 'FederatedScope', '.git')
output, _ = self.exec_cmd(f'[ -d {fs_path} ] && echo "Found" || '
f'echo "Not found"')
output, _ = self._exec_cmd(f'[ -d {fs_path} ] && echo "Found" || '
f'echo "Not found"')
if output == 'Not found':
# TODO: git clone here
logger.exception(f'Repo not find in {fs_path}.')
return False
logger.info(f'FS repo Found in {root_path}.')
return True

def _check_task_status(self):
"""
Check task status.
"""
pass

def setup_fs(self):
logger.info("Checking environment, please wait...")
if not self._check_conda():
raise Exception('The environment is not configured properly.')
if not self._check_source():
raise Exception('The FS repo is not configured properly.')

def launch_task(self, command):
self._check_task_status()
stdout, _ = self._exec_python_cmd(f'nohup python '
f'federatedscope/main.py {command} '
f'> /dev/null 2>&1 & echo $!')
self.tasks.add(stdout)
return stdout


def anonymize(info, mask):
for key, value in info.items():
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

dev_requires = test_requires + ['pre-commit']

org_requires = ['paramiko==2.11.0', 'celery[redis]']
org_requires = ['paramiko==2.11.0', 'celery[redis]', 'cmd2']

benchmark_hpo_requires = [
'configspace==0.5.0', 'hpbandster==0.7.4', 'smac==1.3.3', 'optuna==2.10.0'
Expand Down