Skip to content

Commit

Permalink
[CoreEngine] set the name of all monitor processes, remove the redund…
Browse files Browse the repository at this point in the history
…ant binding codes for deployment devices, change the account manager,
  • Loading branch information
fedml-alex committed Jun 21, 2024
1 parent fd257b8 commit d7481be
Show file tree
Hide file tree
Showing 13 changed files with 185 additions and 328 deletions.
2 changes: 2 additions & 0 deletions python/fedml/computing/scheduler/master/base_master_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def login(
if communication_manager is None:
self.protocol_mgr.start()

return login_result

@staticmethod
def logout():
GeneralConstants.cleanup_run_process(None, is_master=True)
Expand Down

This file was deleted.

This file was deleted.

71 changes: 36 additions & 35 deletions python/fedml/computing/scheduler/scheduler_core/account_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,15 @@ class FedMLAccountManager(Singleton):
DEVICE_ID_DOCKER_HUB_TAG = ".DockerHub"

def __init__(self):
if not hasattr(self, "agent_args"):
self.agent_args = None
pass

@staticmethod
def get_instance():
return FedMLAccountManager()

def login(self, user_id, api_key="", device_id=None, os_name=None, role=None, runner_cmd=None):
# Build the agent args
self.build_agent_args(
agent_args = self.build_agent_args(
user_id, api_key=api_key, device_id=device_id, os_name=os_name, role=role, runner_cmd=runner_cmd
)

Expand Down Expand Up @@ -95,8 +94,8 @@ def login(self, user_id, api_key="", device_id=None, os_name=None, role=None, ru
# noinspection PyBroadException
try:
edge_id, user_name, extra_url, general_edge_id = FedMLAccountManager.bind_account_and_device_id(
service_config["ml_ops_config"]["EDGE_BINDING_URL"], self.agent_args.account_id,
self.agent_args.unique_device_id, self.agent_args.os_name,
service_config["ml_ops_config"]["EDGE_BINDING_URL"], agent_args.account_id,
agent_args.unique_device_id, agent_args.os_name,
api_key=api_key, role=role
)
if edge_id > 0:
Expand All @@ -120,13 +119,13 @@ def login(self, user_id, api_key="", device_id=None, os_name=None, role=None, ru
return None

# Fill the bound result to agent args.
self.fill_argent_args(
log_server_url=log_server_url, server_id=edge_id,
agent_args = self.fill_argent_args(
agent_args, log_server_url=log_server_url, server_id=edge_id,
edge_id=edge_id, general_edge_id=general_edge_id,
user_name=user_name, extra_url=extra_url,
agent_config=service_config)

return self.agent_args
return agent_args

def build_agent_args(self, user_id, api_key=None, device_id=None, os_name=None, role=None, runner_cmd=None):
# Generate the suffix for device based on the role
Expand Down Expand Up @@ -159,32 +158,31 @@ def build_agent_args(self, user_id, api_key=None, device_id=None, os_name=None,

# Build the agent args
version = fedml.get_env_version()
if self.agent_args is None:
self.agent_args = AgentArgs()
self.agent_args.role = role
self.agent_args.account_id = user_id
self.agent_args.api_key = api_key
self.agent_args.current_running_dir = GeneralConstants.get_deploy_fedml_home_dir(is_master=is_master) \
agent_args = AgentArgs()
agent_args.role = role
agent_args.account_id = user_id
agent_args.api_key = api_key
agent_args.current_running_dir = GeneralConstants.get_deploy_fedml_home_dir(is_master=is_master) \
if is_deploy else GeneralConstants.get_launch_fedml_home_dir(is_master=is_master)
sys_name = platform.system()
if sys_name == "Darwin":
sys_name = "MacOS"
self.agent_args.os_name = sys_name if os_name is None or os_name == "" else os_name
self.agent_args.version = version
self.agent_args.log_file_dir = GeneralConstants.get_deploy_log_file_dir(is_master=is_master) \
agent_args.os_name = sys_name if os_name is None or os_name == "" else os_name
agent_args.version = version
agent_args.log_file_dir = GeneralConstants.get_deploy_log_file_dir(is_master=is_master) \
if is_deploy else GeneralConstants.get_launch_log_file_dir(is_master=is_master)
is_from_docker = False
if device_id is not None and device_id != "0":
self.agent_args.current_device_id = device_id
agent_args.current_device_id = device_id
else:
data_dir = GeneralConstants.get_deploy_data_dir(is_master=is_master) \
if is_deploy else GeneralConstants.get_launch_data_dir(is_master=is_master)
is_gpu_provider = True if role == FedMLAccountManager.ROLE_GPU_PROVIDER else False
self.agent_args.current_device_id = FedMLAccountManager.get_device_id(
agent_args.current_device_id = FedMLAccountManager.get_device_id(
data_dir=data_dir, use_machine_id=is_gpu_provider)
self.agent_args.device_id = self.agent_args.current_device_id
self.agent_args.config_version = version
self.agent_args.cloud_region = ""
agent_args.device_id = agent_args.current_device_id
agent_args.config_version = version
agent_args.cloud_region = ""

# Check if it is running in the fedml docker hub
is_from_fedml_docker_hub = False
Expand All @@ -196,26 +194,29 @@ def build_agent_args(self, user_id, api_key=None, device_id=None, os_name=None,
# Build unique device id
docker_tag = FedMLAccountManager.DEVICE_ID_DOCKER_TAG if is_from_docker else ""
docker_tag = FedMLAccountManager.DEVICE_ID_DOCKER_HUB_TAG if is_from_fedml_docker_hub else docker_tag
unique_device_id = f"{self.agent_args.current_device_id}@{self.agent_args.os_name}" \
unique_device_id = f"{agent_args.current_device_id}@{agent_args.os_name}" \
f"{docker_tag}{device_id_suffix}"
if role == FedMLAccountManager.ROLE_CLOUD_SERVER:
unique_device_id = self.agent_args.current_device_id
unique_device_id = agent_args.current_device_id

# Set the unique device id
self.agent_args.is_from_docker = is_from_docker or is_from_fedml_docker_hub
self.agent_args.unique_device_id = unique_device_id
self.agent_args.runner_cmd = runner_cmd
agent_args.is_from_docker = is_from_docker or is_from_fedml_docker_hub
agent_args.unique_device_id = unique_device_id
agent_args.runner_cmd = runner_cmd

return agent_args

def fill_argent_args(
self, log_server_url=None, server_id=None, edge_id=None,
self, agent_args, log_server_url=None, server_id=None, edge_id=None,
user_name=None, extra_url=None, general_edge_id=None, agent_config=None):
self.agent_args.log_server_url = log_server_url
self.agent_args.server_id = server_id
self.agent_args.edge_id = edge_id
self.agent_args.user_name = user_name
self.agent_args.extra_url = extra_url
self.agent_args.general_edge_id = general_edge_id
self.agent_args.agent_config = agent_config
agent_args.log_server_url = log_server_url
agent_args.server_id = server_id
agent_args.edge_id = edge_id
agent_args.user_name = user_name
agent_args.extra_url = extra_url
agent_args.general_edge_id = general_edge_id
agent_args.agent_config = agent_config
return agent_args

@staticmethod
def write_login_failed_file(is_client=True):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class GeneralConstants:
FEDML_MESSAGE_CENTER_SENDER_TAG = "message-center-sender"
FEDML_STATUS_CENTER_TAG = "status-center"
FEDML_LOG_PROCESS_TAG = "log"
FEDML_MONITOR_PROCESS_TAG = "monitor"

FEDML_TOPIC_STATUS_CENTER_STOP = "anywhere/status_center/stop"

Expand Down Expand Up @@ -232,7 +233,9 @@ def get_payload_complete_job(run_id, server_id):

@staticmethod
def get_process_name(process_tag, run_id=None, edge_id=None):
return f"{GeneralConstants.FEDML_PROCESS_NAME_PREFIX}{process_tag}-run-{run_id}-edge-{edge_id}"
return f'{GeneralConstants.FEDML_PROCESS_NAME_PREFIX}{process_tag}'\
f'{"-run-" + str(run_id) if run_id is not None and int(run_id) != 0 else ""}'\
f'{"-edge-" + str(edge_id) if edge_id is not None else ""}'

@staticmethod
def get_process_name_with_prefix(process_prefix, run_id=None, edge_id=None):
Expand Down Expand Up @@ -285,4 +288,7 @@ def get_message_center_sender_process_name(message_center_name):
def get_status_center_process_name(status_center_tag):
return f"{GeneralConstants.FEDML_PROCESS_NAME_PREFIX}{GeneralConstants.FEDML_STATUS_CENTER_TAG}-{status_center_tag}"


@staticmethod
def get_monitor_process_name(monitor_tag, run_id, edge_id):
return GeneralConstants.get_process_name(
f"{GeneralConstants.FEDML_MONITOR_PROCESS_TAG}-{monitor_tag}", run_id, edge_id)
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,7 @@ def start_listener(
self.listener_message_event = multiprocessing.Event()
self.listener_message_event.clear()
self.listener_agent_config = agent_config
message_runner = self.get_message_runner()
# message_runner = self
message_runner = self
message_runner.listener_agent_config = agent_config
process_name = GeneralConstants.get_message_center_listener_process_name(message_center_name)
if platform.system() == "Windows":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ def start_status_center(self, sender_message_center_queue=None,
self.status_event.clear()
self.status_sender_message_center_queue = sender_message_center_queue
self.status_listener_message_center_queue = listener_message_center_queue
self.status_runner = self.get_status_runner()
#self.status_runner = self
self.status_runner = self
process_name = GeneralConstants.get_status_center_process_name(
f'{"deploy" if self.is_deployment_status_center else "launch"}_'
f'{"slave" if is_slave_agent else "master"}_agent')
Expand Down
Loading

0 comments on commit d7481be

Please sign in to comment.